Transports¶
All transports implement the RpcTransport protocol (a readable + writable byte stream). Your service code is identical regardless of transport — only the setup differs.
Choosing a Transport¶
| Transport | Use case | Latency | Setup |
|---|---|---|---|
serve_pipe |
Tests, demos, embedded | Lowest (in-process) | One line |
connect / SubprocessTransport |
Isolated workers, CLI tools | Low (stdin/stdout) | Spawn a child process |
serve_unix / unix_connect |
Local IPC, long-lived services | Low (Unix socket) | Socket path |
ShmPipeTransport |
Co-located processes, large batches | Lowest (zero-copy) | Shared memory segment |
http_connect / make_wsgi_app |
Network services, browser clients | Higher (HTTP) | WSGI server + client |
Pipe (in-process, for tests)¶
from vgi_rpc import serve_pipe
with serve_pipe(MyService, MyServiceImpl()) as proxy:
result = proxy.add(a=1.0, b=2.0) # proxy is typed as MyService
Subprocess¶
# worker.py
from vgi_rpc import run_server
run_server(MyService, MyServiceImpl())
# client.py
from vgi_rpc import connect
with connect(MyService, ["python", "worker.py"]) as proxy:
result = proxy.add(a=1.0, b=2.0)
Unix domain socket¶
Low-latency local IPC without subprocess management. The server listens on a socket path; clients connect by path. Not available on Windows.
Server entry point:
# worker.py
from vgi_rpc.rpc import RpcServer, serve_unix
server = RpcServer(MyService, MyServiceImpl())
serve_unix(server, "/tmp/my-service.sock")
Client:
from vgi_rpc.rpc import unix_connect
with unix_connect(MyService, "/tmp/my-service.sock") as proxy:
result = proxy.add(a=1.0, b=2.0)
For in-process testing, serve_unix_pipe starts the server on a background thread with an auto-generated socket path:
from vgi_rpc.rpc import serve_unix_pipe
with serve_unix_pipe(MyService, MyServiceImpl()) as proxy:
result = proxy.add(a=1.0, b=2.0)
Use threaded=True on serve_unix to handle multiple concurrent clients (each connection gets its own thread):
Shared memory¶
Wraps a PipeTransport with a shared memory side-channel. When a batch fits in the segment, only a small pointer is sent over the pipe — the receiver reads data directly from shared memory:
from vgi_rpc import ShmPipeTransport, make_pipe_pair
from vgi_rpc.shm import ShmSegment
shm = ShmSegment.create(size=100 * 1024 * 1024) # 100 MB
client_pipe, server_pipe = make_pipe_pair()
client_transport = ShmPipeTransport(client_pipe, shm)
server_transport = ShmPipeTransport(server_pipe, shm)
Falls back to normal pipe IPC for batches that exceed the segment size.
Transport awareness¶
Workers can ask which transport they are bound to — useful for tailoring startup work, enabling transport-specific metrics, or branching per-call behaviour. Three things are exposed:
RpcServer.transport_kind: aTransportKindenum (PIPE,HTTP,UNIX) orNonebefore serving begins.RpcServer.transport_capabilities: afrozenset[str]of capability flags. Currently{"shm"}when bound to aShmPipeTransport; empty otherwise.CallContext.kind: per-call view of the sameTransportKind, so methods that already acceptctxcan branch without reaching for the server.
For one-shot startup work, an implementation may define an on_serve_start(self, kind) method. The framework calls it once per process before the first request is dispatched:
from vgi_rpc import CallContext, TransportKind
class MyServiceImpl:
def on_serve_start(self, kind: TransportKind) -> None:
"""Called once per process before the first request."""
if kind is TransportKind.HTTP:
self._cache = build_http_cache()
else:
self._cache = None
def fetch(self, key: str, ctx: CallContext) -> str:
if ctx.kind is TransportKind.HTTP and self._cache is not None:
return self._cache.get(key)
return load_from_disk(key)
The hook is duck-typed (no base class needed); a ServeStartHook Protocol is exported for users who want to type-hint their implementation. Hook exceptions propagate (and are logged via logging.getLogger("vgi_rpc.rpc").exception first), so a misconfigured worker dies loudly rather than serving in a broken state.
For pipe / unix transports the hook fires inside RpcServer.serve(transport). For HTTP it fires lazily on the first request handled in the current process — this is fork-safe under pre-fork WSGI servers (gunicorn, uwsgi), so each child worker runs its own startup logic. Subprocess workers report PIPE because they speak Arrow IPC over the parent's stdin/stdout.
SHM availability is exposed via transport_capabilities, not the enum, so coarse transport-kind checks stay simple while workers that need zero-copy paths can still detect shared memory:
def on_serve_start(self, kind: TransportKind) -> None:
if "shm" in self.server.transport_capabilities:
self._enable_zero_copy()
API Reference¶
PipeTransport¶
PipeTransport
¶
Transport backed by file-like IO streams (e.g. from os.pipe()).
Initialize with reader and writer streams.
Source code in vgi_rpc/rpc/_transport.py
SubprocessTransport¶
SubprocessTransport
¶
SubprocessTransport(
cmd: list[str],
*,
stderr: StderrMode = INHERIT,
stderr_logger: Logger | None = None
)
Transport that communicates with a child process over stdin/stdout.
Spawns a command via subprocess.Popen with stdin=PIPE,
stdout=PIPE, and configurable stderr handling via :class:StderrMode.
The writer (child's stdin) is kept unbuffered (bufsize=0) so IPC
data is flushed immediately. The reader (child's stdout) is wrapped
in a BufferedReader because Arrow IPC expects read(n) to
return exactly n bytes, but raw FileIO.read(n) on a pipe may
return fewer (POSIX short-read semantics).
Spawn the subprocess and wire up stdin/stdout as the transport.
| PARAMETER | DESCRIPTION |
|---|---|
cmd
|
Command to spawn.
TYPE:
|
stderr
|
How to handle the child's stderr stream.
TYPE:
|
stderr_logger
|
Logger for
TYPE:
|
Source code in vgi_rpc/rpc/_transport.py
close
¶
Close stdin (sends EOF), wait for exit, close stdout.
Source code in vgi_rpc/rpc/_transport.py
ShmPipeTransport¶
ShmPipeTransport
¶
ShmPipeTransport(pipe: PipeTransport, shm: ShmSegment)
Pipe transport with shared memory side-channel for batch data.
Does NOT own the ShmSegment — caller manages segment lifecycle.
Closing the transport closes the pipe only.
Initialize with a pipe transport and a shared memory segment.
Source code in vgi_rpc/rpc/_transport.py
StderrMode¶
StderrMode
¶
Bases: Enum
How to handle child process stderr in SubprocessTransport.
Members
INHERIT: Child stderr goes to parent's stderr (default).
PIPE: Parent drains child stderr via a daemon thread and
forwards each line to a logging.Logger.
DEVNULL: Child stderr discarded at OS level.
TransportKind¶
TransportKind
¶
Bases: StrEnum
Coarse identifier of the transport binding an :class:RpcServer.
Workers (RPC implementations) read this via :attr:RpcServer.transport_kind
or the on_serve_start lifecycle hook to tailor startup behaviour
(skip HTTP-only caching, enable transport-specific metrics, etc.).
Members are :class:StrEnum so the value is wire/log-friendly.
Members
PIPE: Stdio / PipeTransport / ShmPipeTransport. Subprocess
workers also report PIPE because they speak Arrow IPC over
the parent's stdin/stdout.
HTTP: WSGI / serve_http / make_wsgi_app.
UNIX: UnixTransport (AF_UNIX socket).
ServeStartHook¶
ServeStartHook
¶
Bases: Protocol
Optional lifecycle hook on the RPC implementation, fired once at startup.
If an implementation defines an on_serve_start(self, kind) method,
the framework calls it once per process before the first request is
dispatched. The hook is duck-typed — defining this Protocol is not
required; it exists purely for users who want to type-hint their impl.
For HTTP, the hook fires on the first request handled in the current
process (lazy / fork-safe). For pipe / unix transports, it fires
when RpcServer.serve(transport) begins.
A hook that raises propagates out of the serve path. The exception
is logged via logging.getLogger("vgi_rpc.rpc").exception first so
the trace is visible even when stderr is captured.
on_serve_start
¶
on_serve_start(kind: TransportKind) -> None
Utility Functions¶
make_pipe_pair
¶
make_pipe_pair() -> tuple[PipeTransport, PipeTransport]
Create connected client/server transports using os.pipe().
Returns (client_transport, server_transport).
Source code in vgi_rpc/rpc/_transport.py
serve_stdio
¶
serve_stdio(server: RpcServer) -> None
Serve RPC requests over stdin/stdout.
This is the server-side entry point for subprocess mode. The reader
uses default buffering so that read(n) returns exactly n bytes
(Arrow IPC requires this; raw FileIO.read(n) may short-read on
pipes). The writer is unbuffered (buffering=0) so IPC data is
flushed immediately. Uses closefd=False so the original stdio
descriptors are not closed on exit.
Emits a diagnostic warning to stderr when stdin or stdout is connected to a terminal, since the process expects binary Arrow IPC data.
Source code in vgi_rpc/rpc/_transport.py
serve_unix
¶
serve_unix(
server: RpcServer,
path: str,
*,
threaded: bool = False,
max_connections: int | None = None,
idle_timeout: float | None = None,
on_bound: Callable[[str], None] | None = None
) -> None
Serve RPC on a Unix domain socket, accepting connections in a loop.
Binds to path, listens, and accepts connections. By default connections
are handled sequentially (one at a time). With threaded=True each
accepted connection is served in its own daemon thread, allowing multiple
clients to use the same socket concurrently.
.. note::
When threaded=True the implementation object passed to
:class:RpcServer is shared across threads. If it carries mutable
state the caller must ensure thread-safety (e.g. via locks). Per-
connection stream state (:class:StreamState) is always isolated.
| PARAMETER | DESCRIPTION |
|---|---|
server
|
The RPC server to dispatch requests.
TYPE:
|
path
|
Filesystem path for the Unix domain socket.
TYPE:
|
threaded
|
When
TYPE:
|
max_connections
|
Maximum number of connections served simultaneously.
Only meaningful when threaded is
TYPE:
|
idle_timeout
|
When set, the worker self-terminates after this many
seconds with zero active connections. Only meaningful when
threaded is
TYPE:
|
on_bound
|
Optional callback invoked once the socket is bound and
listening, before the accept loop runs. Used by
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
RuntimeError
|
If another process is already listening on path. |
ValueError
|
If idle_timeout is set but threaded is |
Source code in vgi_rpc/rpc/_transport.py
402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 | |
unix_connect
¶
unix_connect(
protocol: type[P],
path: str,
*,
on_log: Callable[[Message], None] | None = None,
external_location: ExternalLocationConfig | None = None,
ipc_validation: IpcValidation = FULL
) -> Iterator[P]
Connect to a Unix domain socket RPC server and yield a typed proxy.
| PARAMETER | DESCRIPTION |
|---|---|
protocol
|
The Protocol class defining the RPC interface.
TYPE:
|
path
|
Filesystem path of the Unix domain socket.
TYPE:
|
on_log
|
Optional callback for log messages from the server.
TYPE:
|
external_location
|
Optional ExternalLocation configuration for resolving and producing externalized batches.
TYPE:
|
ipc_validation
|
Validation level for incoming IPC batches.
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
P
|
A typed RPC proxy supporting all methods defined on protocol. |
Source code in vgi_rpc/rpc/__init__.py
serve_unix_pipe
¶
serve_unix_pipe(
protocol: type[P],
implementation: object,
*,
on_log: Callable[[Message], None] | None = None,
external_location: ExternalLocationConfig | None = None,
ipc_validation: IpcValidation | None = None
) -> Iterator[P]
Start an in-process Unix socket server and yield a typed client proxy.
Like :func:serve_pipe but uses a Unix socketpair() instead of
os.pipe() pairs. Useful for tests and demos — no subprocess needed.
A background thread runs RpcServer.serve() on the server side.
| PARAMETER | DESCRIPTION |
|---|---|
protocol
|
The Protocol class defining the RPC interface.
TYPE:
|
implementation
|
The implementation object.
TYPE:
|
on_log
|
Optional callback for log messages from the server.
TYPE:
|
external_location
|
Optional ExternalLocation configuration for resolving and producing externalized batches.
TYPE:
|
ipc_validation
|
Validation level for incoming IPC batches.
When
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
P
|
A typed RPC proxy supporting all methods defined on protocol. |