Transports¶
All transports implement the RpcTransport protocol (a readable + writable byte stream). Your service code is identical regardless of transport — only the setup differs.
Choosing a Transport¶
| Transport | Use case | Latency | Setup |
|---|---|---|---|
serve_pipe |
Tests, demos, embedded | Lowest (in-process) | One line |
connect / SubprocessTransport |
Isolated workers, CLI tools | Low (stdin/stdout) | Spawn a child process |
serve_unix / unix_connect |
Local IPC, long-lived services | Low (Unix socket) | Socket path |
ShmPipeTransport |
Co-located processes, large batches | Lowest (zero-copy) | Shared memory segment |
http_connect / make_wsgi_app |
Network services, browser clients | Higher (HTTP) | WSGI server + client |
Pipe (in-process, for tests)¶
from vgi_rpc import serve_pipe
with serve_pipe(MyService, MyServiceImpl()) as proxy:
result = proxy.add(a=1.0, b=2.0) # proxy is typed as MyService
Subprocess¶
# worker.py
from vgi_rpc import run_server
run_server(MyService, MyServiceImpl())
# client.py
from vgi_rpc import connect
with connect(MyService, ["python", "worker.py"]) as proxy:
result = proxy.add(a=1.0, b=2.0)
Unix domain socket¶
Low-latency local IPC without subprocess management. The server listens on a socket path; clients connect by path. Not available on Windows.
Server entry point:
# worker.py
from vgi_rpc.rpc import RpcServer, serve_unix
server = RpcServer(MyService, MyServiceImpl())
serve_unix(server, "/tmp/my-service.sock")
Client:
from vgi_rpc.rpc import unix_connect
with unix_connect(MyService, "/tmp/my-service.sock") as proxy:
result = proxy.add(a=1.0, b=2.0)
For in-process testing, serve_unix_pipe starts the server on a background thread with an auto-generated socket path:
from vgi_rpc.rpc import serve_unix_pipe
with serve_unix_pipe(MyService, MyServiceImpl()) as proxy:
result = proxy.add(a=1.0, b=2.0)
Use threaded=True on serve_unix to handle multiple concurrent clients (each connection gets its own thread):
Shared memory¶
Wraps a PipeTransport with a shared memory side-channel. When a batch fits in the segment, only a small pointer is sent over the pipe — the receiver reads data directly from shared memory:
from vgi_rpc import ShmPipeTransport, make_pipe_pair
from vgi_rpc.shm import ShmSegment
shm = ShmSegment.create(size=100 * 1024 * 1024) # 100 MB
client_pipe, server_pipe = make_pipe_pair()
client_transport = ShmPipeTransport(client_pipe, shm)
server_transport = ShmPipeTransport(server_pipe, shm)
Falls back to normal pipe IPC for batches that exceed the segment size.
API Reference¶
PipeTransport¶
PipeTransport
¶
Transport backed by file-like IO streams (e.g. from os.pipe()).
Initialize with reader and writer streams.
Source code in vgi_rpc/rpc/_transport.py
SubprocessTransport¶
SubprocessTransport
¶
SubprocessTransport(
cmd: list[str],
*,
stderr: StderrMode = INHERIT,
stderr_logger: Logger | None = None
)
Transport that communicates with a child process over stdin/stdout.
Spawns a command via subprocess.Popen with stdin=PIPE,
stdout=PIPE, and configurable stderr handling via :class:StderrMode.
The writer (child's stdin) is kept unbuffered (bufsize=0) so IPC
data is flushed immediately. The reader (child's stdout) is wrapped
in a BufferedReader because Arrow IPC expects read(n) to
return exactly n bytes, but raw FileIO.read(n) on a pipe may
return fewer (POSIX short-read semantics).
Spawn the subprocess and wire up stdin/stdout as the transport.
| PARAMETER | DESCRIPTION |
|---|---|
cmd
|
Command to spawn.
TYPE:
|
stderr
|
How to handle the child's stderr stream.
TYPE:
|
stderr_logger
|
Logger for
TYPE:
|
Source code in vgi_rpc/rpc/_transport.py
close
¶
Close stdin (sends EOF), wait for exit, close stdout.
Source code in vgi_rpc/rpc/_transport.py
ShmPipeTransport¶
ShmPipeTransport
¶
ShmPipeTransport(pipe: PipeTransport, shm: ShmSegment)
Pipe transport with shared memory side-channel for batch data.
Does NOT own the ShmSegment — caller manages segment lifecycle.
Closing the transport closes the pipe only.
Initialize with a pipe transport and a shared memory segment.
Source code in vgi_rpc/rpc/_transport.py
StderrMode¶
StderrMode
¶
Bases: Enum
How to handle child process stderr in SubprocessTransport.
Members
INHERIT: Child stderr goes to parent's stderr (default).
PIPE: Parent drains child stderr via a daemon thread and
forwards each line to a logging.Logger.
DEVNULL: Child stderr discarded at OS level.
Utility Functions¶
make_pipe_pair
¶
make_pipe_pair() -> tuple[PipeTransport, PipeTransport]
Create connected client/server transports using os.pipe().
Returns (client_transport, server_transport).
Source code in vgi_rpc/rpc/_transport.py
serve_stdio
¶
serve_stdio(server: RpcServer) -> None
Serve RPC requests over stdin/stdout.
This is the server-side entry point for subprocess mode. The reader
uses default buffering so that read(n) returns exactly n bytes
(Arrow IPC requires this; raw FileIO.read(n) may short-read on
pipes). The writer is unbuffered (buffering=0) so IPC data is
flushed immediately. Uses closefd=False so the original stdio
descriptors are not closed on exit.
Emits a diagnostic warning to stderr when stdin or stdout is connected to a terminal, since the process expects binary Arrow IPC data.
Source code in vgi_rpc/rpc/_transport.py
serve_unix
¶
serve_unix(
server: RpcServer,
path: str,
*,
threaded: bool = False,
max_connections: int | None = None
) -> None
Serve RPC on a Unix domain socket, accepting connections in a loop.
Binds to path, listens, and accepts connections. By default connections
are handled sequentially (one at a time). With threaded=True each
accepted connection is served in its own daemon thread, allowing multiple
clients to use the same socket concurrently.
.. note::
When threaded=True the implementation object passed to
:class:RpcServer is shared across threads. If it carries mutable
state the caller must ensure thread-safety (e.g. via locks). Per-
connection stream state (:class:StreamState) is always isolated.
| PARAMETER | DESCRIPTION |
|---|---|
server
|
The RPC server to dispatch requests.
TYPE:
|
path
|
Filesystem path for the Unix domain socket.
TYPE:
|
threaded
|
When
TYPE:
|
max_connections
|
Maximum number of connections served simultaneously.
Only meaningful when threaded is
TYPE:
|
Source code in vgi_rpc/rpc/_transport.py
unix_connect
¶
unix_connect(
protocol: type[P],
path: str,
*,
on_log: Callable[[Message], None] | None = None,
external_location: ExternalLocationConfig | None = None,
ipc_validation: IpcValidation = FULL
) -> Iterator[P]
Connect to a Unix domain socket RPC server and yield a typed proxy.
| PARAMETER | DESCRIPTION |
|---|---|
protocol
|
The Protocol class defining the RPC interface.
TYPE:
|
path
|
Filesystem path of the Unix domain socket.
TYPE:
|
on_log
|
Optional callback for log messages from the server.
TYPE:
|
external_location
|
Optional ExternalLocation configuration for resolving and producing externalized batches.
TYPE:
|
ipc_validation
|
Validation level for incoming IPC batches.
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
P
|
A typed RPC proxy supporting all methods defined on protocol. |
Source code in vgi_rpc/rpc/__init__.py
serve_unix_pipe
¶
serve_unix_pipe(
protocol: type[P],
implementation: object,
*,
on_log: Callable[[Message], None] | None = None,
external_location: ExternalLocationConfig | None = None,
ipc_validation: IpcValidation | None = None
) -> Iterator[P]
Start an in-process Unix socket server and yield a typed client proxy.
Like :func:serve_pipe but uses a Unix socketpair() instead of
os.pipe() pairs. Useful for tests and demos — no subprocess needed.
A background thread runs RpcServer.serve() on the server side.
| PARAMETER | DESCRIPTION |
|---|---|
protocol
|
The Protocol class defining the RPC interface.
TYPE:
|
implementation
|
The implementation object.
TYPE:
|
on_log
|
Optional callback for log messages from the server.
TYPE:
|
external_location
|
Optional ExternalLocation configuration for resolving and producing externalized batches.
TYPE:
|
ipc_validation
|
Validation level for incoming IPC batches.
When
TYPE:
|
| YIELDS | DESCRIPTION |
|---|---|
P
|
A typed RPC proxy supporting all methods defined on protocol. |