Skip to content

Introspection

Discover an RPC service's methods, schemas, and parameter types at runtime or statically.

Usage

Static introspection (no server needed)

Use rpc_methods() to inspect a Protocol class, or describe_rpc() for a human-readable summary:

from vgi_rpc import describe_rpc, rpc_methods

methods = rpc_methods(Calculator)
for name, info in methods.items():
    print(f"{name}: {info.method_type.value}, params={info.params_schema}")

print(describe_rpc(Calculator))

Runtime introspection (over a connection)

Enable the built-in __describe__ method on the server, then query it from any client:

from vgi_rpc import RpcServer, introspect, connect

# Server: enable describe
server = RpcServer(Calculator, CalculatorImpl(), enable_describe=True)

# Client: query over pipe/subprocess
with connect(Calculator, ["python", "worker.py"]) as proxy:
    desc = introspect(proxy._transport)
    for name, method in desc.methods.items():
        print(f"{name}: {method.method_type.value}")

# Client: query over HTTP
from vgi_rpc import http_introspect
desc = http_introspect("http://localhost:8080")

The ServiceDescription contains method metadata, parameter schemas, default values, and docstrings — everything a dynamic client needs without the Python Protocol class.

For stream methods that declare a header type (Stream[S, H]), MethodDescription includes has_header (bool) and header_schema (pa.Schema | None) fields describing the header's Arrow schema. These are False / None for methods without headers.

API Reference

Functions

introspect

introspect(
    transport: RpcTransport,
    ipc_validation: IpcValidation = FULL,
) -> ServiceDescription

Send a __describe__ request over any RpcTransport.

PARAMETER DESCRIPTION
transport

An open RpcTransport.

TYPE: RpcTransport

ipc_validation

Validation level for incoming IPC batches.

TYPE: IpcValidation DEFAULT: FULL

RETURNS DESCRIPTION
ServiceDescription

A ServiceDescription with all method metadata.

RAISES DESCRIPTION
RpcError

If the server does not support introspection or returns an error.

Source code in vgi_rpc/introspect.py
def introspect(
    transport: RpcTransport,
    ipc_validation: IpcValidation = IpcValidation.FULL,
) -> ServiceDescription:
    """Send a ``__describe__`` request over any ``RpcTransport``.

    Args:
        transport: An open ``RpcTransport``.
        ipc_validation: Validation level for incoming IPC batches.

    Returns:
        A ``ServiceDescription`` with all method metadata.

    Raises:
        RpcError: If the server does not support introspection or returns
            an error.

    """
    # Write a minimal request: empty params, method = __describe__
    request_metadata = pa.KeyValueMetadata(
        {RPC_METHOD_KEY: DESCRIBE_METHOD_NAME.encode(), REQUEST_VERSION_KEY: REQUEST_VERSION}
    )
    with ipc.new_stream(transport.writer, _EMPTY_SCHEMA) as writer:
        empty_batch = pa.RecordBatch.from_pydict({}, schema=_EMPTY_SCHEMA)
        writer.write_batch(empty_batch, custom_metadata=request_metadata)

    # Read response
    reader = ValidatedReader(ipc.open_stream(transport.reader), ipc_validation)
    # Skip log batches, collect the data batch
    while True:
        batch, custom_metadata = reader.read_next_batch_with_custom_metadata()
        if not _dispatch_log_or_error(batch, custom_metadata):
            break
    _drain_stream(reader)

    return parse_describe_batch(batch, custom_metadata)

rpc_methods cached

rpc_methods(protocol: type) -> Mapping[str, RpcMethodInfo]

Introspect a Protocol class and return RpcMethodInfo for each method.

Skips underscore-prefixed names and non-callable attributes.

Source code in vgi_rpc/rpc/_types.py
@functools.lru_cache(maxsize=64)
def rpc_methods(protocol: type) -> Mapping[str, RpcMethodInfo]:
    """Introspect a Protocol class and return RpcMethodInfo for each method.

    Skips underscore-prefixed names and non-callable attributes.
    """
    result: dict[str, RpcMethodInfo] = {}

    # Get method names from Protocol — look at annotations and callables
    for name in dir(protocol):
        if name.startswith("_"):
            continue
        attr = getattr(protocol, name, None)
        if attr is None or not callable(attr):
            continue

        try:
            method_hints = get_type_hints(attr, include_extras=True)
        except (NameError, AttributeError) as exc:
            raise TypeError(f"Failed to resolve type hints for {protocol.__name__}.{name}(): {exc}") from exc

        _validate_protocol_params(protocol, name, inspect.signature(attr))

        return_hint = method_hints.get("return", type(None))
        method_type, result_type, has_return = _classify_return_type(return_hint)

        # For unary methods, build result schema from the result type
        result_schema = _build_result_schema(result_type) if method_type == MethodType.UNARY else _EMPTY_SCHEMA

        # Extract header type from Stream[S, H] annotations
        header_type: type[ArrowSerializableDataclass] | None = None
        if method_type == MethodType.STREAM:
            stream_args = get_args(return_hint)
            if len(stream_args) >= 2:
                h_arg = stream_args[1]
                if isinstance(h_arg, type) and issubclass(h_arg, ArrowSerializableDataclass):
                    header_type = h_arg

        param_defaults = _get_param_defaults(protocol, name)
        params_schema = _build_params_schema(method_hints)
        param_types = {k: v for k, v in method_hints.items() if k not in ("self", "return")}

        doc = getattr(attr, "__doc__", None)

        result[name] = RpcMethodInfo(
            name=name,
            params_schema=params_schema,
            result_schema=result_schema,
            result_type=result_type,
            method_type=method_type,
            has_return=has_return,
            doc=doc,
            param_defaults=param_defaults,
            param_types=param_types,
            header_type=header_type,
        )

    return MappingProxyType(result)

describe_rpc

describe_rpc(
    protocol: type,
    *,
    methods: Mapping[str, RpcMethodInfo] | None = None
) -> str

Return a human-readable description of an RPC protocol's methods.

Source code in vgi_rpc/rpc/__init__.py
def describe_rpc(protocol: type, *, methods: Mapping[str, RpcMethodInfo] | None = None) -> str:
    """Return a human-readable description of an RPC protocol's methods."""
    if methods is None:
        methods = rpc_methods(protocol)
    lines: list[str] = [f"RPC Protocol: {protocol.__name__}", ""]

    for name, info in sorted(methods.items()):
        lines.append(f"  {name}({info.method_type.value})")
        lines.append(f"    params: {info.params_schema}")
        if info.method_type == MethodType.UNARY:
            lines.append(f"    result: {info.result_schema}")
        if info.doc:
            lines.append(f"    doc: {info.doc.strip()}")
        lines.append("")

    return "\n".join(lines)

Data Classes

ServiceDescription dataclass

ServiceDescription(
    protocol_name: str,
    request_version: str,
    describe_version: str,
    server_id: str,
    methods: Mapping[str, MethodDescription],
)

Complete description of an RPC service from introspection.

ATTRIBUTE DESCRIPTION
protocol_name

Name of the Protocol class.

TYPE: str

request_version

Wire protocol version.

TYPE: str

describe_version

Introspection format version.

TYPE: str

server_id

Server instance identifier.

TYPE: str

methods

Mapping of method name to MethodDescription.

TYPE: Mapping[str, MethodDescription]

__str__

__str__() -> str

Return a human-readable summary of the service.

Source code in vgi_rpc/introspect.py
def __str__(self) -> str:
    """Return a human-readable summary of the service."""
    lines: list[str] = [
        f"RPC Service: {self.protocol_name}",
        f"  server_id: {self.server_id}",
        f"  request_version: {self.request_version}",
        f"  describe_version: {self.describe_version}",
        "",
    ]
    for name, md in sorted(self.methods.items()):
        lines.append(f"  {name}({md.method_type.value})")
        if md.param_types:
            params_str = ", ".join(f"{k}: {v}" for k, v in md.param_types.items())
            lines.append(f"    params: {params_str}")
        if md.has_return:
            lines.append(f"    returns: {md.result_schema}")
        if md.doc:
            lines.append(f"    doc: {md.doc.strip()}")
        lines.append("")
    return "\n".join(lines)

MethodDescription dataclass

MethodDescription(
    name: str,
    method_type: MethodType,
    doc: str | None,
    has_return: bool,
    params_schema: Schema,
    result_schema: Schema,
    param_types: dict[str, str] = dict(),
    param_defaults: dict[str, object] = dict(),
    has_header: bool = False,
    header_schema: Schema | None = None,
)

Description of a single RPC method from introspection.

For STREAM methods, result_schema reflects the Protocol-level return type (always empty). The actual stream output schema is determined at runtime by the implementation and cannot be reported statically.

ATTRIBUTE DESCRIPTION
name

Method name as it appears on the Protocol.

TYPE: str

method_type

Whether this is UNARY or STREAM.

TYPE: MethodType

doc

The method's docstring, or None.

TYPE: str | None

has_return

True for unary methods that return a value.

TYPE: bool

params_schema

Arrow schema for request parameters.

TYPE: Schema

result_schema

Arrow schema for the response (unary) or empty (streams).

TYPE: Schema

param_types

Human-readable type names keyed by parameter name.

TYPE: dict[str, str]

param_defaults

Parsed default values keyed by parameter name.

TYPE: dict[str, object]

has_header

True for stream methods that declare a header type.

TYPE: bool

header_schema

Arrow schema for the header, or None if no header.

TYPE: Schema | None

Constants

DESCRIBE_METHOD_NAME module-attribute

DESCRIBE_METHOD_NAME = '__describe__'

Well-known method name for introspection requests.