Skip to content

Transports

All transports implement the RpcTransport protocol (a readable + writable byte stream). Your service code is identical regardless of transport — only the setup differs.

Choosing a Transport

Transport Use case Latency Setup
serve_pipe Tests, demos, embedded Lowest (in-process) One line
connect / SubprocessTransport Isolated workers, CLI tools Low (stdin/stdout) Spawn a child process
serve_unix / unix_connect Local IPC, long-lived services Low (Unix socket) Socket path
ShmPipeTransport Co-located processes, large batches Lowest (zero-copy) Shared memory segment
http_connect / make_wsgi_app Network services, browser clients Higher (HTTP) WSGI server + client

Pipe (in-process, for tests)

from vgi_rpc import serve_pipe

with serve_pipe(MyService, MyServiceImpl()) as proxy:
    result = proxy.add(a=1.0, b=2.0)  # proxy is typed as MyService

Subprocess

# worker.py
from vgi_rpc import run_server
run_server(MyService, MyServiceImpl())

# client.py
from vgi_rpc import connect
with connect(MyService, ["python", "worker.py"]) as proxy:
    result = proxy.add(a=1.0, b=2.0)

Unix domain socket

Low-latency local IPC without subprocess management. The server listens on a socket path; clients connect by path. Not available on Windows.

Server entry point:

# worker.py
from vgi_rpc.rpc import RpcServer, serve_unix

server = RpcServer(MyService, MyServiceImpl())
serve_unix(server, "/tmp/my-service.sock")

Client:

from vgi_rpc.rpc import unix_connect

with unix_connect(MyService, "/tmp/my-service.sock") as proxy:
    result = proxy.add(a=1.0, b=2.0)

For in-process testing, serve_unix_pipe starts the server on a background thread with an auto-generated socket path:

from vgi_rpc.rpc import serve_unix_pipe

with serve_unix_pipe(MyService, MyServiceImpl()) as proxy:
    result = proxy.add(a=1.0, b=2.0)

Use threaded=True on serve_unix to handle multiple concurrent clients (each connection gets its own thread):

serve_unix(server, "/tmp/my-service.sock", threaded=True)

Shared memory

Wraps a PipeTransport with a shared memory side-channel. When a batch fits in the segment, only a small pointer is sent over the pipe — the receiver reads data directly from shared memory:

from vgi_rpc import ShmPipeTransport, make_pipe_pair
from vgi_rpc.shm import ShmSegment

shm = ShmSegment.create(size=100 * 1024 * 1024)  # 100 MB
client_pipe, server_pipe = make_pipe_pair()
client_transport = ShmPipeTransport(client_pipe, shm)
server_transport = ShmPipeTransport(server_pipe, shm)

Falls back to normal pipe IPC for batches that exceed the segment size.

API Reference

PipeTransport

PipeTransport

PipeTransport(reader: IOBase, writer: IOBase)

Transport backed by file-like IO streams (e.g. from os.pipe()).

Initialize with reader and writer streams.

Source code in vgi_rpc/rpc/_transport.py
def __init__(self, reader: IOBase, writer: IOBase) -> None:
    """Initialize with reader and writer streams."""
    self._reader = reader
    self._writer = writer

reader property

reader: IOBase

Readable binary stream.

writer property

writer: IOBase

Writable binary stream.

close

close() -> None

Close both streams.

Source code in vgi_rpc/rpc/_transport.py
def close(self) -> None:
    """Close both streams."""
    self._reader.close()
    self._writer.close()

SubprocessTransport

SubprocessTransport

SubprocessTransport(
    cmd: list[str],
    *,
    stderr: StderrMode = INHERIT,
    stderr_logger: Logger | None = None
)

Transport that communicates with a child process over stdin/stdout.

Spawns a command via subprocess.Popen with stdin=PIPE, stdout=PIPE, and configurable stderr handling via :class:StderrMode.

The writer (child's stdin) is kept unbuffered (bufsize=0) so IPC data is flushed immediately. The reader (child's stdout) is wrapped in a BufferedReader because Arrow IPC expects read(n) to return exactly n bytes, but raw FileIO.read(n) on a pipe may return fewer (POSIX short-read semantics).

Spawn the subprocess and wire up stdin/stdout as the transport.

PARAMETER DESCRIPTION
cmd

Command to spawn.

TYPE: list[str]

stderr

How to handle the child's stderr stream.

TYPE: StderrMode DEFAULT: INHERIT

stderr_logger

Logger for StderrMode.PIPE output. Defaults to logging.getLogger("vgi_rpc.subprocess.stderr").

TYPE: Logger | None DEFAULT: None

Source code in vgi_rpc/rpc/_transport.py
def __init__(
    self,
    cmd: list[str],
    *,
    stderr: StderrMode = StderrMode.INHERIT,
    stderr_logger: logging.Logger | None = None,
) -> None:
    """Spawn the subprocess and wire up stdin/stdout as the transport.

    Args:
        cmd: Command to spawn.
        stderr: How to handle the child's stderr stream.
        stderr_logger: Logger for ``StderrMode.PIPE`` output.
            Defaults to ``logging.getLogger("vgi_rpc.subprocess.stderr")``.

    """
    if wire_transport_logger.isEnabledFor(logging.DEBUG):
        wire_transport_logger.debug(
            "SubprocessTransport init: cmd=%s, stderr=%s",
            cmd,
            stderr.value,
        )

    if stderr == StderrMode.DEVNULL:
        stderr_arg: int | None = subprocess.DEVNULL
    elif stderr == StderrMode.PIPE:
        stderr_arg = subprocess.PIPE
    else:
        stderr_arg = None

    self._proc = subprocess.Popen(
        cmd,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=stderr_arg,
        bufsize=0,
    )
    assert self._proc.stdout is not None
    assert self._proc.stdin is not None
    self._reader: IOBase = os.fdopen(self._proc.stdout.fileno(), "rb", closefd=False)
    self._writer: IOBase = cast(IOBase, self._proc.stdin)
    self._closed = False
    self._stderr_thread: threading.Thread | None = None
    if wire_transport_logger.isEnabledFor(logging.DEBUG):
        wire_transport_logger.debug(
            "SubprocessTransport spawned: pid=%d, stdin_fd=%d, stdout_fd=%d",
            self._proc.pid,
            self._proc.stdin.fileno(),
            self._proc.stdout.fileno(),
        )

    if stderr == StderrMode.PIPE:
        assert self._proc.stderr is not None
        if stderr_logger is None:
            stderr_logger = logging.getLogger("vgi_rpc.subprocess.stderr")
        self._stderr_thread = threading.Thread(
            target=_drain_stderr,
            args=(self._proc.stderr, stderr_logger),
            daemon=True,
        )
        self._stderr_thread.start()

proc property

proc: Popen[bytes]

The underlying Popen process.

reader property

reader: IOBase

Readable binary stream (child's stdout, buffered).

writer property

writer: IOBase

Writable binary stream (child's stdin, unbuffered).

close

close() -> None

Close stdin (sends EOF), wait for exit, close stdout.

Source code in vgi_rpc/rpc/_transport.py
def close(self) -> None:
    """Close stdin (sends EOF), wait for exit, close stdout."""
    if self._closed:
        return
    if wire_transport_logger.isEnabledFor(logging.DEBUG):
        wire_transport_logger.debug("SubprocessTransport closing: pid=%d", self._proc.pid)
    self._closed = True
    if self._proc.stdin:
        self._proc.stdin.close()
    try:
        self._proc.wait(timeout=10)
    except subprocess.TimeoutExpired:
        self._proc.kill()
        self._proc.wait()
    if self._stderr_thread is not None:
        self._stderr_thread.join(timeout=5)
    self._reader.close()
    if wire_transport_logger.isEnabledFor(logging.DEBUG):
        wire_transport_logger.debug(
            "SubprocessTransport closed: pid=%d, exit_code=%s",
            self._proc.pid,
            self._proc.returncode,
        )

ShmPipeTransport

ShmPipeTransport

ShmPipeTransport(pipe: PipeTransport, shm: ShmSegment)

Pipe transport with shared memory side-channel for batch data.

Does NOT own the ShmSegment — caller manages segment lifecycle. Closing the transport closes the pipe only.

Initialize with a pipe transport and a shared memory segment.

Source code in vgi_rpc/rpc/_transport.py
def __init__(self, pipe: PipeTransport, shm: ShmSegment) -> None:
    """Initialize with a pipe transport and a shared memory segment."""
    self._pipe = pipe
    self._shm = shm

reader property

reader: IOBase

Readable binary stream (delegated to pipe).

writer property

writer: IOBase

Writable binary stream (delegated to pipe).

shm property

shm: ShmSegment

The shared memory segment.

close

close() -> None

Close the pipe transport (does NOT close/unlink shm).

Source code in vgi_rpc/rpc/_transport.py
def close(self) -> None:
    """Close the pipe transport (does NOT close/unlink shm)."""
    self._pipe.close()

StderrMode

StderrMode

Bases: Enum

How to handle child process stderr in SubprocessTransport.

Members

INHERIT: Child stderr goes to parent's stderr (default). PIPE: Parent drains child stderr via a daemon thread and forwards each line to a logging.Logger. DEVNULL: Child stderr discarded at OS level.

Utility Functions

make_pipe_pair

make_pipe_pair() -> tuple[PipeTransport, PipeTransport]

Create connected client/server transports using os.pipe().

Returns (client_transport, server_transport).

Source code in vgi_rpc/rpc/_transport.py
def make_pipe_pair() -> tuple[PipeTransport, PipeTransport]:
    """Create connected client/server transports using os.pipe().

    Returns (client_transport, server_transport).
    """
    c2s_r, c2s_w = os.pipe()
    s2c_r, s2c_w = os.pipe()
    if wire_transport_logger.isEnabledFor(logging.DEBUG):
        wire_transport_logger.debug(
            "make_pipe_pair: c2s=(%d,%d), s2c=(%d,%d)",
            c2s_r,
            c2s_w,
            s2c_r,
            s2c_w,
        )
    client = PipeTransport(
        os.fdopen(s2c_r, "rb"),
        os.fdopen(c2s_w, "wb", buffering=0),
    )
    server = PipeTransport(
        os.fdopen(c2s_r, "rb"),
        os.fdopen(s2c_w, "wb", buffering=0),
    )
    return client, server

serve_stdio

serve_stdio(server: RpcServer) -> None

Serve RPC requests over stdin/stdout.

This is the server-side entry point for subprocess mode. The reader uses default buffering so that read(n) returns exactly n bytes (Arrow IPC requires this; raw FileIO.read(n) may short-read on pipes). The writer is unbuffered (buffering=0) so IPC data is flushed immediately. Uses closefd=False so the original stdio descriptors are not closed on exit.

Emits a diagnostic warning to stderr when stdin or stdout is connected to a terminal, since the process expects binary Arrow IPC data.

Source code in vgi_rpc/rpc/_transport.py
def serve_stdio(server: RpcServer) -> None:
    """Serve RPC requests over stdin/stdout.

    This is the server-side entry point for subprocess mode.  The reader
    uses default buffering so that ``read(n)`` returns exactly *n* bytes
    (Arrow IPC requires this; raw ``FileIO.read(n)`` may short-read on
    pipes).  The writer is unbuffered (``buffering=0``) so IPC data is
    flushed immediately.  Uses ``closefd=False`` so the original stdio
    descriptors are not closed on exit.

    Emits a diagnostic warning to stderr when stdin or stdout is connected
    to a terminal, since the process expects binary Arrow IPC data.
    """
    if sys.stdin.isatty() or sys.stdout.isatty():
        sys.stderr.write(
            "WARNING: This process communicates via Arrow IPC on stdin/stdout "
            "and is not intended to be run interactively.\n"
            "It should be launched as a subprocess by an RPC client "
            "(e.g. vgi_rpc.connect()).\n"
        )
    reader = os.fdopen(sys.stdin.fileno(), "rb", closefd=False)
    writer = os.fdopen(sys.stdout.fileno(), "wb", buffering=0, closefd=False)
    if wire_transport_logger.isEnabledFor(logging.DEBUG):
        wire_transport_logger.debug(
            "serve_stdio: server_id=%s, protocol=%s",
            server.server_id,
            server.protocol_name,
        )
    transport = PipeTransport(reader, writer)
    server.serve(transport)

serve_unix

serve_unix(
    server: RpcServer,
    path: str,
    *,
    threaded: bool = False,
    max_connections: int | None = None
) -> None

Serve RPC on a Unix domain socket, accepting connections in a loop.

Binds to path, listens, and accepts connections. By default connections are handled sequentially (one at a time). With threaded=True each accepted connection is served in its own daemon thread, allowing multiple clients to use the same socket concurrently.

.. note::

When threaded=True the implementation object passed to :class:RpcServer is shared across threads. If it carries mutable state the caller must ensure thread-safety (e.g. via locks). Per- connection stream state (:class:StreamState) is always isolated.

PARAMETER DESCRIPTION
server

The RPC server to dispatch requests.

TYPE: RpcServer

path

Filesystem path for the Unix domain socket.

TYPE: str

threaded

When True, serve each connection in a separate thread.

TYPE: bool DEFAULT: False

max_connections

Maximum number of connections served simultaneously. Only meaningful when threaded is True; ignored otherwise. Excess connections are accepted but queued until a slot is free. None means unlimited.

TYPE: int | None DEFAULT: None

Source code in vgi_rpc/rpc/_transport.py
def serve_unix(
    server: RpcServer,
    path: str,
    *,
    threaded: bool = False,
    max_connections: int | None = None,
) -> None:
    """Serve RPC on a Unix domain socket, accepting connections in a loop.

    Binds to *path*, listens, and accepts connections.  By default connections
    are handled sequentially (one at a time).  With ``threaded=True`` each
    accepted connection is served in its own daemon thread, allowing multiple
    clients to use the same socket concurrently.

    .. note::

       When ``threaded=True`` the *implementation* object passed to
       :class:`RpcServer` is shared across threads.  If it carries mutable
       state the caller must ensure thread-safety (e.g. via locks).  Per-
       connection stream state (:class:`StreamState`) is always isolated.

    Args:
        server: The RPC server to dispatch requests.
        path: Filesystem path for the Unix domain socket.
        threaded: When ``True``, serve each connection in a separate thread.
        max_connections: Maximum number of connections served simultaneously.
            Only meaningful when *threaded* is ``True``; ignored otherwise.
            Excess connections are accepted but queued until a slot is free.
            ``None`` means unlimited.

    """
    sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
    with contextlib.suppress(FileNotFoundError):
        os.unlink(path)
    sock.bind(path)
    sock.listen(128 if threaded else 1)
    if wire_transport_logger.isEnabledFor(logging.DEBUG):
        wire_transport_logger.debug(
            "serve_unix: server_id=%s, protocol=%s, path=%s, threaded=%s",
            server.server_id,
            server.protocol_name,
            path,
            threaded,
        )
    try:
        if threaded:
            _serve_unix_threaded(server, sock, max_connections)
        else:
            _serve_unix_sequential(server, sock)
    finally:
        sock.close()
        with contextlib.suppress(FileNotFoundError):
            os.unlink(path)

unix_connect

unix_connect(
    protocol: type[P],
    path: str,
    *,
    on_log: Callable[[Message], None] | None = None,
    external_location: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = FULL
) -> Iterator[P]

Connect to a Unix domain socket RPC server and yield a typed proxy.

PARAMETER DESCRIPTION
protocol

The Protocol class defining the RPC interface.

TYPE: type[P]

path

Filesystem path of the Unix domain socket.

TYPE: str

on_log

Optional callback for log messages from the server.

TYPE: Callable[[Message], None] | None DEFAULT: None

external_location

Optional ExternalLocation configuration for resolving and producing externalized batches.

TYPE: ExternalLocationConfig | None DEFAULT: None

ipc_validation

Validation level for incoming IPC batches.

TYPE: IpcValidation DEFAULT: FULL

YIELDS DESCRIPTION
P

A typed RPC proxy supporting all methods defined on protocol.

Source code in vgi_rpc/rpc/__init__.py
@contextlib.contextmanager
def unix_connect[P](
    protocol: type[P],
    path: str,
    *,
    on_log: Callable[[Message], None] | None = None,
    external_location: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = IpcValidation.FULL,
) -> Iterator[P]:
    """Connect to a Unix domain socket RPC server and yield a typed proxy.

    Args:
        protocol: The Protocol class defining the RPC interface.
        path: Filesystem path of the Unix domain socket.
        on_log: Optional callback for log messages from the server.
        external_location: Optional ExternalLocation configuration for
            resolving and producing externalized batches.
        ipc_validation: Validation level for incoming IPC batches.

    Yields:
        A typed RPC proxy supporting all methods defined on *protocol*.

    """
    import socket as _socket

    sock = _socket.socket(_socket.AF_UNIX, _socket.SOCK_STREAM)
    try:
        sock.connect(path)
    except BaseException:
        sock.close()
        raise
    transport = UnixTransport(sock)
    try:
        with RpcConnection(
            protocol, transport, on_log=on_log, external_location=external_location, ipc_validation=ipc_validation
        ) as proxy:
            yield proxy
    finally:
        transport.close()

serve_unix_pipe

serve_unix_pipe(
    protocol: type[P],
    implementation: object,
    *,
    on_log: Callable[[Message], None] | None = None,
    external_location: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation | None = None
) -> Iterator[P]

Start an in-process Unix socket server and yield a typed client proxy.

Like :func:serve_pipe but uses a Unix socketpair() instead of os.pipe() pairs. Useful for tests and demos — no subprocess needed. A background thread runs RpcServer.serve() on the server side.

PARAMETER DESCRIPTION
protocol

The Protocol class defining the RPC interface.

TYPE: type[P]

implementation

The implementation object.

TYPE: object

on_log

Optional callback for log messages from the server.

TYPE: Callable[[Message], None] | None DEFAULT: None

external_location

Optional ExternalLocation configuration for resolving and producing externalized batches.

TYPE: ExternalLocationConfig | None DEFAULT: None

ipc_validation

Validation level for incoming IPC batches. When None (the default), both components use IpcValidation.FULL.

TYPE: IpcValidation | None DEFAULT: None

YIELDS DESCRIPTION
P

A typed RPC proxy supporting all methods defined on protocol.

Source code in vgi_rpc/rpc/__init__.py
@contextlib.contextmanager
def serve_unix_pipe[P](
    protocol: type[P],
    implementation: object,
    *,
    on_log: Callable[[Message], None] | None = None,
    external_location: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation | None = None,
) -> Iterator[P]:
    """Start an in-process Unix socket server and yield a typed client proxy.

    Like :func:`serve_pipe` but uses a Unix ``socketpair()`` instead of
    ``os.pipe()`` pairs.  Useful for tests and demos — no subprocess needed.
    A background thread runs ``RpcServer.serve()`` on the server side.

    Args:
        protocol: The Protocol class defining the RPC interface.
        implementation: The implementation object.
        on_log: Optional callback for log messages from the server.
        external_location: Optional ExternalLocation configuration for
            resolving and producing externalized batches.
        ipc_validation: Validation level for incoming IPC batches.
            When ``None`` (the default), both components use
            ``IpcValidation.FULL``.

    Yields:
        A typed RPC proxy supporting all methods defined on *protocol*.

    """
    client_transport, server_transport = make_unix_pair()
    server = RpcServer(
        protocol,
        implementation,
        external_location=external_location,
        ipc_validation=ipc_validation if ipc_validation is not None else IpcValidation.FULL,
    )
    thread = threading.Thread(target=server.serve, args=(server_transport,), daemon=True)
    thread.start()
    try:
        with RpcConnection(
            protocol,
            client_transport,
            on_log=on_log,
            external_location=external_location,
            ipc_validation=ipc_validation if ipc_validation is not None else IpcValidation.FULL,
        ) as proxy:
            yield proxy
    finally:
        client_transport.close()
        thread.join(timeout=5)
        server_transport.close()