Skip to content

HTTP Transport

HTTP transport using Falcon (server) and httpx (client). Requires pip install vgi-rpc[http].

Quick Start

Server

Create a WSGI app and serve it with any WSGI server (waitress, gunicorn, etc.):

from vgi_rpc import RpcServer, make_wsgi_app

server = RpcServer(MyService, MyServiceImpl())
app = make_wsgi_app(server)
# serve `app` with waitress, gunicorn, etc.

Client

from vgi_rpc import http_connect

with http_connect(MyService, "http://localhost:8080") as proxy:
    result = proxy.echo(message="hello")  # proxy is typed as MyService

Testing (no real server)

make_sync_client wraps a Falcon TestClient so you can test the full HTTP stack in-process:

from vgi_rpc import RpcServer
from vgi_rpc.http import http_connect, make_sync_client

server = RpcServer(MyService, MyServiceImpl())
client = make_sync_client(server)

with http_connect(MyService, client=client) as proxy:
    assert proxy.echo(message="hello") == "hello"

API Reference

Server

make_wsgi_app

make_wsgi_app(
    server: RpcServer,
    *,
    prefix: str = "/vgi",
    signing_key: bytes | None = None,
    max_stream_response_bytes: int | None = None,
    max_request_bytes: int | None = None,
    authenticate: (
        Callable[[Request], AuthContext] | None
    ) = None,
    cors_origins: str | Iterable[str] | None = None,
    upload_url_provider: UploadUrlProvider | None = None,
    max_upload_bytes: int | None = None,
    otel_config: object | None = None,
    token_ttl: int = 3600
) -> App[Request, Response]

Create a Falcon WSGI app that serves RPC requests over HTTP.

PARAMETER DESCRIPTION
server

The RpcServer instance to serve.

TYPE: RpcServer

prefix

URL prefix for all RPC endpoints (default /vgi).

TYPE: str DEFAULT: '/vgi'

signing_key

HMAC key for signing state tokens. When None (the default), a random 32-byte key is generated per process. This means state tokens issued by one worker are invalid in another — you must provide a shared key for multi-process deployments (e.g. gunicorn with multiple workers).

TYPE: bytes | None DEFAULT: None

max_stream_response_bytes

When set, producer stream responses are broken into multiple HTTP exchanges once the response body exceeds this size. The client transparently resumes via POST /{method}/exchange. None (default) disables resumable streaming.

TYPE: int | None DEFAULT: None

max_request_bytes

When set, the value is advertised via the VGI-Max-Request-Bytes response header on every response (including OPTIONS). Clients can use http_capabilities() to discover this limit and decide whether to use external storage for large payloads. Advertisement only — no server-side enforcement. None (default) omits the header.

TYPE: int | None DEFAULT: None

authenticate

Optional callback that extracts an :class:AuthContext from a Falcon Request. When provided, every request is authenticated before dispatch. The callback should raise ValueError (bad credentials) or PermissionError (forbidden) on failure — these are mapped to HTTP 401. Other exceptions propagate as 500.

TYPE: Callable[[Request], AuthContext] | None DEFAULT: None

cors_origins

Allowed origins for CORS. Pass "*" to allow all origins, a single origin string like "https://example.com", or an iterable of origin strings. None (the default) disables CORS headers. Uses Falcon's built-in CORSMiddleware which also handles preflight OPTIONS requests automatically.

TYPE: str | Iterable[str] | None DEFAULT: None

upload_url_provider

Optional provider for generating pre-signed upload URLs. When set, the __upload_url__/init endpoint is enabled and VGI-Upload-URL-Support: true is advertised on every response.

TYPE: UploadUrlProvider | None DEFAULT: None

max_upload_bytes

When set (and upload_url_provider is set), advertised via the VGI-Max-Upload-Bytes header. Informs clients of the maximum size they may upload to vended URLs. Advertisement only — no server-side enforcement.

TYPE: int | None DEFAULT: None

otel_config

Optional OtelConfig for OpenTelemetry instrumentation. When provided, instrument_server() is called and _OtelFalconMiddleware is prepended for W3C trace propagation. Requires pip install vgi-rpc[otel].

TYPE: object | None DEFAULT: None

token_ttl

Maximum age of stream state tokens in seconds. Tokens older than this are rejected with HTTP 400. Default is 3600 (1 hour). Set to 0 to disable expiry checking.

TYPE: int DEFAULT: 3600

RETURNS DESCRIPTION
App[Request, Response]

A Falcon application with routes for unary and stream RPC calls.

Source code in vgi_rpc/http/_server.py
def make_wsgi_app(
    server: RpcServer,
    *,
    prefix: str = "/vgi",
    signing_key: bytes | None = None,
    max_stream_response_bytes: int | None = None,
    max_request_bytes: int | None = None,
    authenticate: Callable[[falcon.Request], AuthContext] | None = None,
    cors_origins: str | Iterable[str] | None = None,
    upload_url_provider: UploadUrlProvider | None = None,
    max_upload_bytes: int | None = None,
    otel_config: object | None = None,
    token_ttl: int = 3600,
) -> falcon.App[falcon.Request, falcon.Response]:
    """Create a Falcon WSGI app that serves RPC requests over HTTP.

    Args:
        server: The RpcServer instance to serve.
        prefix: URL prefix for all RPC endpoints (default ``/vgi``).
        signing_key: HMAC key for signing state tokens.  When ``None``
            (the default), a random 32-byte key is generated **per process**.
            This means state tokens issued by one worker are invalid in
            another — you **must** provide a shared key for multi-process
            deployments (e.g. gunicorn with multiple workers).
        max_stream_response_bytes: When set, producer stream responses are
            broken into multiple HTTP exchanges once the response body
            exceeds this size.  The client transparently resumes via
            ``POST /{method}/exchange``.  ``None`` (default) disables
            resumable streaming.
        max_request_bytes: When set, the value is advertised via the
            ``VGI-Max-Request-Bytes`` response header on every response
            (including OPTIONS).  Clients can use ``http_capabilities()``
            to discover this limit and decide whether to use external
            storage for large payloads.  Advertisement only — no
            server-side enforcement.  ``None`` (default) omits the header.
        authenticate: Optional callback that extracts an :class:`AuthContext`
            from a Falcon ``Request``.  When provided, every request is
            authenticated before dispatch.  The callback should raise
            ``ValueError`` (bad credentials) or ``PermissionError``
            (forbidden) on failure — these are mapped to HTTP 401.
            Other exceptions propagate as 500.
        cors_origins: Allowed origins for CORS.  Pass ``"*"`` to allow all
            origins, a single origin string like ``"https://example.com"``,
            or an iterable of origin strings.  ``None`` (the default)
            disables CORS headers.  Uses Falcon's built-in
            ``CORSMiddleware`` which also handles preflight OPTIONS
            requests automatically.
        upload_url_provider: Optional provider for generating pre-signed
            upload URLs.  When set, the ``__upload_url__/init`` endpoint
            is enabled and ``VGI-Upload-URL-Support: true`` is advertised
            on every response.
        max_upload_bytes: When set (and ``upload_url_provider`` is set),
            advertised via the ``VGI-Max-Upload-Bytes`` header.  Informs
            clients of the maximum size they may upload to vended URLs.
            Advertisement only — no server-side enforcement.
        otel_config: Optional ``OtelConfig`` for OpenTelemetry instrumentation.
            When provided, ``instrument_server()`` is called and
            ``_OtelFalconMiddleware`` is prepended for W3C trace propagation.
            Requires ``pip install vgi-rpc[otel]``.
        token_ttl: Maximum age of stream state tokens in seconds.  Tokens
            older than this are rejected with HTTP 400.  Default is 3600
            (1 hour).  Set to ``0`` to disable expiry checking.

    Returns:
        A Falcon application with routes for unary and stream RPC calls.

    """
    if signing_key is None:
        warnings.warn(
            "No signing_key provided; generating a random per-process key. "
            "State tokens will be invalid across workers — pass a shared key "
            "for multi-process deployments.",
            stacklevel=2,
        )
        signing_key = os.urandom(32)
    # OpenTelemetry instrumentation (optional)
    if otel_config is not None:
        from vgi_rpc.otel import OtelConfig, _OtelFalconMiddleware, instrument_server

        if not isinstance(otel_config, OtelConfig):
            raise TypeError(f"otel_config must be an OtelConfig instance, got {type(otel_config).__name__}")
        instrument_server(server, otel_config)

    app_handler = _HttpRpcApp(
        server,
        signing_key,
        max_stream_response_bytes,
        max_request_bytes,
        upload_url_provider,
        max_upload_bytes,
        token_ttl,
    )
    middleware: list[Any] = [_RequestIdMiddleware()]

    # OTel middleware must come before auth so spans cover the full request
    if otel_config is not None:
        middleware.append(_OtelFalconMiddleware())

    cors_expose: list[str] = []

    # Build capability headers
    capability_headers: dict[str, str] = {}
    if max_request_bytes is not None:
        capability_headers[MAX_REQUEST_BYTES_HEADER] = str(max_request_bytes)
        cors_expose.append(MAX_REQUEST_BYTES_HEADER)
    if upload_url_provider is not None:
        capability_headers[UPLOAD_URL_HEADER] = "true"
        cors_expose.append(UPLOAD_URL_HEADER)
        if max_upload_bytes is not None:
            capability_headers[MAX_UPLOAD_BYTES_HEADER] = str(max_upload_bytes)
            cors_expose.append(MAX_UPLOAD_BYTES_HEADER)

    if cors_origins is not None:
        cors_kwargs: dict[str, Any] = {"allow_origins": cors_origins}
        if cors_expose:
            cors_kwargs["expose_headers"] = cors_expose
        middleware.append(falcon.CORSMiddleware(**cors_kwargs))
    if authenticate is not None:
        middleware.append(_AuthMiddleware(authenticate))
    if capability_headers:
        middleware.append(_CapabilitiesMiddleware(capability_headers))
    app: falcon.App[falcon.Request, falcon.Response] = falcon.App(middleware=middleware or None)
    app.add_route(f"{prefix}/{{method}}", _RpcResource(app_handler))
    app.add_route(f"{prefix}/{{method}}/init", _StreamInitResource(app_handler))
    app.add_route(f"{prefix}/{{method}}/exchange", _ExchangeResource(app_handler))
    if upload_url_provider is not None:
        app.add_route(f"{prefix}/__upload_url__/init", _UploadUrlResource(app_handler))

    _logger.info(
        "WSGI app created for %s (server_id=%s, prefix=%s, auth=%s)",
        server.protocol_name,
        server.server_id,
        prefix,
        "enabled" if authenticate is not None else "disabled",
        extra={
            "server_id": server.server_id,
            "protocol": server.protocol_name,
            "prefix": prefix,
            "auth_enabled": authenticate is not None,
        },
    )

    return app

Client

http_connect

http_connect(
    protocol: type[P],
    base_url: str | None = None,
    *,
    prefix: str = "/vgi",
    on_log: Callable[[Message], None] | None = None,
    client: Client | _SyncTestClient | None = None,
    external_location: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = FULL,
    retry: HttpRetryConfig | None = None
) -> Iterator[P]

Connect to an HTTP RPC server and yield a typed proxy.

PARAMETER DESCRIPTION
protocol

The Protocol class defining the RPC interface.

TYPE: type[P]

base_url

Base URL of the server (e.g. http://localhost:8000). Required when client is None; ignored when a pre-built client is provided. The internally-created client follows redirects transparently.

TYPE: str | None DEFAULT: None

prefix

URL prefix matching the server's prefix (default /vgi).

TYPE: str DEFAULT: '/vgi'

on_log

Optional callback for log messages from the server.

TYPE: Callable[[Message], None] | None DEFAULT: None

client

Optional HTTP client — httpx.Client for production, or a _SyncTestClient from make_sync_client() for testing.

TYPE: Client | _SyncTestClient | None DEFAULT: None

external_location

Optional ExternalLocationConfig for resolving and producing externalized batches.

TYPE: ExternalLocationConfig | None DEFAULT: None

ipc_validation

Validation level for incoming IPC batches.

TYPE: IpcValidation DEFAULT: FULL

retry

Optional retry configuration for transient HTTP failures. When None (the default), no retries are attempted.

TYPE: HttpRetryConfig | None DEFAULT: None

YIELDS DESCRIPTION
P

A typed RPC proxy supporting all methods defined on protocol.

RAISES DESCRIPTION
ValueError

If base_url is None and client is None.

Source code in vgi_rpc/http/_client.py
@contextlib.contextmanager
def http_connect[P](
    protocol: type[P],
    base_url: str | None = None,
    *,
    prefix: str = "/vgi",
    on_log: Callable[[Message], None] | None = None,
    client: httpx.Client | _SyncTestClient | None = None,
    external_location: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = IpcValidation.FULL,
    retry: HttpRetryConfig | None = None,
) -> Iterator[P]:
    """Connect to an HTTP RPC server and yield a typed proxy.

    Args:
        protocol: The Protocol class defining the RPC interface.
        base_url: Base URL of the server (e.g. ``http://localhost:8000``).
            Required when *client* is ``None``; ignored when a pre-built
            *client* is provided.  The internally-created client follows
            redirects transparently.
        prefix: URL prefix matching the server's prefix (default ``/vgi``).
        on_log: Optional callback for log messages from the server.
        client: Optional HTTP client — ``httpx.Client`` for production,
            or a ``_SyncTestClient`` from ``make_sync_client()`` for testing.
        external_location: Optional ExternalLocationConfig for
            resolving and producing externalized batches.
        ipc_validation: Validation level for incoming IPC batches.
        retry: Optional retry configuration for transient HTTP failures.
            When ``None`` (the default), no retries are attempted.

    Yields:
        A typed RPC proxy supporting all methods defined on *protocol*.

    Raises:
        ValueError: If *base_url* is ``None`` and *client* is ``None``.

    """
    own_client = client is None
    if client is None:
        if base_url is None:
            raise ValueError("base_url is required when client is not provided")
        client = httpx.Client(base_url=base_url, follow_redirects=True)

    url_prefix = prefix
    try:
        yield cast(
            P,
            _HttpProxy(
                protocol,
                client,
                url_prefix,
                on_log,
                external_config=external_location,
                ipc_validation=ipc_validation,
                retry_config=retry,
            ),
        )
    finally:
        if own_client:
            client.close()

http_introspect

http_introspect(
    base_url: str | None = None,
    *,
    prefix: str = "/vgi",
    client: Client | _SyncTestClient | None = None,
    ipc_validation: IpcValidation = FULL,
    retry: HttpRetryConfig | None = None
) -> ServiceDescription

Send a __describe__ request over HTTP and return a ServiceDescription.

PARAMETER DESCRIPTION
base_url

Base URL of the server (e.g. http://localhost:8000). Required when client is None.

TYPE: str | None DEFAULT: None

prefix

URL prefix matching the server's prefix (default /vgi).

TYPE: str DEFAULT: '/vgi'

client

Optional HTTP client (httpx.Client or _SyncTestClient).

TYPE: Client | _SyncTestClient | None DEFAULT: None

ipc_validation

Validation level for incoming IPC batches.

TYPE: IpcValidation DEFAULT: FULL

retry

Optional retry configuration for transient HTTP failures.

TYPE: HttpRetryConfig | None DEFAULT: None

RETURNS DESCRIPTION
ServiceDescription

A ServiceDescription with all method metadata.

RAISES DESCRIPTION
RpcError

If the server does not support introspection or returns an error.

ValueError

If base_url is None and client is None.

Source code in vgi_rpc/http/_client.py
def http_introspect(
    base_url: str | None = None,
    *,
    prefix: str = "/vgi",
    client: httpx.Client | _SyncTestClient | None = None,
    ipc_validation: IpcValidation = IpcValidation.FULL,
    retry: HttpRetryConfig | None = None,
) -> ServiceDescription:
    """Send a ``__describe__`` request over HTTP and return a ``ServiceDescription``.

    Args:
        base_url: Base URL of the server (e.g. ``http://localhost:8000``).
            Required when *client* is ``None``.
        prefix: URL prefix matching the server's prefix (default ``/vgi``).
        client: Optional HTTP client (``httpx.Client`` or ``_SyncTestClient``).
        ipc_validation: Validation level for incoming IPC batches.
        retry: Optional retry configuration for transient HTTP failures.

    Returns:
        A ``ServiceDescription`` with all method metadata.

    Raises:
        RpcError: If the server does not support introspection or returns
            an error.
        ValueError: If *base_url* is ``None`` and *client* is ``None``.

    """
    from vgi_rpc.introspect import DESCRIBE_METHOD_NAME, parse_describe_batch

    own_client = client is None
    if client is None:
        if base_url is None:
            raise ValueError("base_url is required when client is not provided")
        client = httpx.Client(base_url=base_url, follow_redirects=True)

    try:
        # Build a minimal request: empty params with __describe__ method name
        req_buf = BytesIO()
        request_metadata = pa.KeyValueMetadata(
            {
                b"vgi_rpc.method": DESCRIBE_METHOD_NAME.encode(),
                b"vgi_rpc.request_version": b"1",
            }
        )
        with ipc.new_stream(req_buf, _EMPTY_SCHEMA) as writer:
            writer.write_batch(
                pa.RecordBatch.from_pydict({}, schema=_EMPTY_SCHEMA),
                custom_metadata=request_metadata,
            )

        resp = _post_with_retry(
            client,
            f"{prefix}/{DESCRIBE_METHOD_NAME}",
            content=req_buf.getvalue(),
            headers={"Content-Type": _ARROW_CONTENT_TYPE},
            config=retry,
        )

        reader = _open_response_stream(resp.content, resp.status_code, ipc_validation)
        # Skip log batches
        while True:
            batch, custom_metadata = reader.read_next_batch_with_custom_metadata()
            if not _dispatch_log_or_error(batch, custom_metadata):
                break
        _drain_stream(reader)

        return parse_describe_batch(batch, custom_metadata)
    finally:
        if own_client:
            client.close()

http_capabilities

http_capabilities(
    base_url: str | None = None,
    *,
    prefix: str = "/vgi",
    client: Client | _SyncTestClient | None = None,
    retry: HttpRetryConfig | None = None
) -> HttpServerCapabilities

Discover server capabilities via an OPTIONS request.

Sends OPTIONS {prefix}/__capabilities__ and reads capability headers (VGI-Max-Request-Bytes, VGI-Upload-URL-Support, VGI-Max-Upload-Bytes) from the response.

PARAMETER DESCRIPTION
base_url

Base URL of the server (e.g. http://localhost:8000). Required when client is None.

TYPE: str | None DEFAULT: None

prefix

URL prefix matching the server's prefix (default /vgi).

TYPE: str DEFAULT: '/vgi'

client

Optional HTTP client (httpx.Client or _SyncTestClient).

TYPE: Client | _SyncTestClient | None DEFAULT: None

retry

Optional retry configuration for transient HTTP failures.

TYPE: HttpRetryConfig | None DEFAULT: None

RETURNS DESCRIPTION
HttpServerCapabilities

An HttpServerCapabilities with discovered values.

RAISES DESCRIPTION
ValueError

If base_url is None and client is None.

Source code in vgi_rpc/http/_client.py
def http_capabilities(
    base_url: str | None = None,
    *,
    prefix: str = "/vgi",
    client: httpx.Client | _SyncTestClient | None = None,
    retry: HttpRetryConfig | None = None,
) -> HttpServerCapabilities:
    """Discover server capabilities via an OPTIONS request.

    Sends ``OPTIONS {prefix}/__capabilities__`` and reads capability
    headers (``VGI-Max-Request-Bytes``, ``VGI-Upload-URL-Support``,
    ``VGI-Max-Upload-Bytes``) from the response.

    Args:
        base_url: Base URL of the server (e.g. ``http://localhost:8000``).
            Required when *client* is ``None``.
        prefix: URL prefix matching the server's prefix (default ``/vgi``).
        client: Optional HTTP client (``httpx.Client`` or ``_SyncTestClient``).
        retry: Optional retry configuration for transient HTTP failures.

    Returns:
        An ``HttpServerCapabilities`` with discovered values.

    Raises:
        ValueError: If *base_url* is ``None`` and *client* is ``None``.

    """
    own_client = client is None
    if client is None:
        if base_url is None:
            raise ValueError("base_url is required when client is not provided")
        client = httpx.Client(base_url=base_url, follow_redirects=True)

    try:
        url = f"{prefix}/__capabilities__"
        resp = _options_with_retry(client, url, config=retry)
        headers = resp.headers

        max_req: int | None = None
        raw = headers.get(MAX_REQUEST_BYTES_HEADER) or headers.get(MAX_REQUEST_BYTES_HEADER.lower())
        if raw is not None:
            with contextlib.suppress(ValueError):
                max_req = int(raw)

        upload_raw = headers.get(UPLOAD_URL_HEADER) or headers.get(UPLOAD_URL_HEADER.lower())
        upload_support = upload_raw == "true" if upload_raw is not None else False

        max_upload: int | None = None
        upload_bytes_raw = headers.get(MAX_UPLOAD_BYTES_HEADER) or headers.get(MAX_UPLOAD_BYTES_HEADER.lower())
        if upload_bytes_raw is not None:
            with contextlib.suppress(ValueError):
                max_upload = int(upload_bytes_raw)

        return HttpServerCapabilities(
            max_request_bytes=max_req,
            upload_url_support=upload_support,
            max_upload_bytes=max_upload,
        )
    finally:
        if own_client:
            client.close()

request_upload_urls

request_upload_urls(
    base_url: str | None = None,
    *,
    count: int = 1,
    prefix: str = "/vgi",
    client: Client | _SyncTestClient | None = None,
    retry: HttpRetryConfig | None = None
) -> list[UploadUrl]

Request pre-signed upload URLs from the server's __upload_url__ endpoint.

The server must have been configured with an upload_url_provider in make_wsgi_app().

PARAMETER DESCRIPTION
base_url

Base URL of the server (e.g. http://localhost:8000). Required when client is None.

TYPE: str | None DEFAULT: None

count

Number of upload URLs to request (default 1, max 100).

TYPE: int DEFAULT: 1

prefix

URL prefix matching the server's prefix (default /vgi).

TYPE: str DEFAULT: '/vgi'

client

Optional HTTP client (httpx.Client or _SyncTestClient).

TYPE: Client | _SyncTestClient | None DEFAULT: None

retry

Optional retry configuration for transient HTTP failures.

TYPE: HttpRetryConfig | None DEFAULT: None

RETURNS DESCRIPTION
list[UploadUrl]

A list of UploadUrl objects with pre-signed PUT and GET URLs.

RAISES DESCRIPTION
RpcError

If the server does not support upload URLs (404) or returns an error.

ValueError

If base_url is None and client is None.

Source code in vgi_rpc/http/_client.py
def request_upload_urls(
    base_url: str | None = None,
    *,
    count: int = 1,
    prefix: str = "/vgi",
    client: httpx.Client | _SyncTestClient | None = None,
    retry: HttpRetryConfig | None = None,
) -> list[UploadUrl]:
    """Request pre-signed upload URLs from the server's ``__upload_url__`` endpoint.

    The server must have been configured with an ``upload_url_provider``
    in ``make_wsgi_app()``.

    Args:
        base_url: Base URL of the server (e.g. ``http://localhost:8000``).
            Required when *client* is ``None``.
        count: Number of upload URLs to request (default 1, max 100).
        prefix: URL prefix matching the server's prefix (default ``/vgi``).
        client: Optional HTTP client (``httpx.Client`` or ``_SyncTestClient``).
        retry: Optional retry configuration for transient HTTP failures.

    Returns:
        A list of ``UploadUrl`` objects with pre-signed PUT and GET URLs.

    Raises:
        RpcError: If the server does not support upload URLs (404) or
            returns an error.
        ValueError: If *base_url* is ``None`` and *client* is ``None``.

    """
    own_client = client is None
    if client is None:
        if base_url is None:
            raise ValueError("base_url is required when client is not provided")
        client = httpx.Client(base_url=base_url, follow_redirects=True)

    try:
        # Build request IPC with standard wire protocol metadata
        req_buf = BytesIO()
        _write_request(req_buf, _UPLOAD_URL_METHOD, _UPLOAD_URL_PARAMS_SCHEMA, {"count": count})

        resp = _post_with_retry(
            client,
            f"{prefix}/__upload_url__/init",
            content=req_buf.getvalue(),
            headers={"Content-Type": _ARROW_CONTENT_TYPE},
            config=retry,
        )

        # Without an upload_url_provider the route doesn't exist and the
        # request falls through to _StreamInitResource → 404.
        if resp.status_code == HTTPStatus.NOT_FOUND:
            raise RpcError("NotSupported", "Server does not support upload URLs", "")

        reader = _open_response_stream(resp.content, resp.status_code)
        urls: list[UploadUrl] = []
        try:
            while True:
                try:
                    batch, custom_metadata = reader.read_next_batch_with_custom_metadata()
                except StopIteration:
                    break

                if _dispatch_log_or_error(batch, custom_metadata):
                    continue

                for i in range(batch.num_rows):
                    upload_url = batch.column("upload_url")[i].as_py()
                    download_url = batch.column("download_url")[i].as_py()
                    expires_at = batch.column("expires_at")[i].as_py()
                    urls.append(UploadUrl(upload_url=upload_url, download_url=download_url, expires_at=expires_at))
        except RpcError:
            _drain_stream(reader)
            raise
        _drain_stream(reader)
        return urls
    finally:
        if own_client:
            client.close()

Capabilities

HttpServerCapabilities dataclass

HttpServerCapabilities(
    max_request_bytes: int | None = None,
    upload_url_support: bool = False,
    max_upload_bytes: int | None = None,
)

Capabilities advertised by an HTTP RPC server.

ATTRIBUTE DESCRIPTION
max_request_bytes

Maximum request body size the server advertises, or None if the server does not advertise a limit. Advertisement only -- no server-side enforcement.

TYPE: int | None

upload_url_support

Whether the server supports the __upload_url__ endpoint for client-side uploads.

TYPE: bool

max_upload_bytes

Maximum upload size the server advertises for client-vended URLs, or None if not advertised. Advertisement only -- no server-side enforcement.

TYPE: int | None

Stream Session

HttpStreamSession

HttpStreamSession(
    client: Client | _SyncTestClient,
    url_prefix: str,
    method: str,
    state_bytes: bytes | None,
    output_schema: Schema,
    on_log: Callable[[Message], None] | None = None,
    *,
    external_config: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = FULL,
    pending_batches: list[AnnotatedBatch] | None = None,
    finished: bool = False,
    header: object | None = None,
    retry_config: HttpRetryConfig | None = None
)

Client-side handle for a stream over HTTP (both producer and exchange patterns).

For producer streams, use __iter__() — yields batches from batched responses and follows continuation tokens transparently. For exchange streams, use exchange() — sends an input batch and receives an output batch.

Supports context manager protocol for convenience.

Initialize with HTTP client, method details, and initial state.

Source code in vgi_rpc/http/_client.py
def __init__(
    self,
    client: httpx.Client | _SyncTestClient,
    url_prefix: str,
    method: str,
    state_bytes: bytes | None,
    output_schema: pa.Schema,
    on_log: Callable[[Message], None] | None = None,
    *,
    external_config: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = IpcValidation.FULL,
    pending_batches: list[AnnotatedBatch] | None = None,
    finished: bool = False,
    header: object | None = None,
    retry_config: HttpRetryConfig | None = None,
) -> None:
    """Initialize with HTTP client, method details, and initial state."""
    self._client = client
    self._url_prefix = url_prefix
    self._method = method
    self._state_bytes = state_bytes
    self._output_schema = output_schema
    self._on_log = on_log
    self._external_config = external_config
    self._ipc_validation = ipc_validation
    self._pending_batches: list[AnnotatedBatch] = pending_batches or []
    self._finished = finished
    self._header = header
    self._retry_config = retry_config

header property

header: object | None

The stream header, or None if the stream has no header.

typed_header

typed_header(header_type: type[H]) -> H

Return the stream header narrowed to the expected type.

PARAMETER DESCRIPTION
header_type

The expected header dataclass type.

TYPE: type[H]

RETURNS DESCRIPTION
H

The header, typed as header_type.

RAISES DESCRIPTION
TypeError

If the header is None or not an instance of header_type.

Source code in vgi_rpc/http/_client.py
def typed_header[H: ArrowSerializableDataclass](self, header_type: type[H]) -> H:
    """Return the stream header narrowed to the expected type.

    Args:
        header_type: The expected header dataclass type.

    Returns:
        The header, typed as *header_type*.

    Raises:
        TypeError: If the header is ``None`` or not an instance of
            *header_type*.

    """
    if self._header is None:
        raise TypeError(f"Stream has no header (expected {header_type.__name__})")
    if not isinstance(self._header, header_type):
        raise TypeError(f"Header type mismatch: expected {header_type.__name__}, got {type(self._header).__name__}")
    return self._header

exchange

exchange(input_batch: AnnotatedBatch) -> AnnotatedBatch

Send an input batch and receive the output batch.

PARAMETER DESCRIPTION
input_batch

The input batch to send.

TYPE: AnnotatedBatch

RETURNS DESCRIPTION
AnnotatedBatch

The output batch from the server.

RAISES DESCRIPTION
RpcError

If the server reports an error or the stream has finished.

Source code in vgi_rpc/http/_client.py
def exchange(self, input_batch: AnnotatedBatch) -> AnnotatedBatch:
    """Send an input batch and receive the output batch.

    Args:
        input_batch: The input batch to send.

    Returns:
        The output batch from the server.

    Raises:
        RpcError: If the server reports an error or the stream has finished.

    """
    if self._state_bytes is None:
        raise RpcError("ProtocolError", "Stream has finished — no state token available", "")

    batch_to_write = input_batch.batch
    cm_to_write = input_batch.custom_metadata

    # Client-side externalization for large inputs
    if self._external_config is not None:
        batch_to_write, cm_to_write = maybe_externalize_batch(batch_to_write, cm_to_write, self._external_config)

    # Write input batch with state in metadata
    req_buf = BytesIO()
    state_md = pa.KeyValueMetadata({STATE_KEY: self._state_bytes})
    merged = merge_metadata(cm_to_write, state_md)
    with ipc.new_stream(req_buf, batch_to_write.schema) as writer:
        writer.write_batch(batch_to_write, custom_metadata=merged)

    if wire_http_logger.isEnabledFor(logging.DEBUG):
        wire_http_logger.debug(
            "HTTP stream exchange: method=%s, input=%s",
            self._method,
            fmt_batch(batch_to_write),
        )
    # Exchange calls are NOT retried: the server's process() method may
    # have side effects, and a proxy 502 after server processing would
    # cause duplicate execution.  Only init/unary/continuation are retried.
    resp = self._client.post(
        f"{self._url_prefix}/{self._method}/exchange",
        content=req_buf.getvalue(),
        headers={"Content-Type": _ARROW_CONTENT_TYPE},
    )
    if wire_http_logger.isEnabledFor(logging.DEBUG):
        wire_http_logger.debug(
            "HTTP stream exchange response: method=%s, status=%d, size=%d",
            self._method,
            resp.status_code,
            len(resp.content),
        )

    # Read response — log batches + data batch with state
    reader = _open_response_stream(resp.content, resp.status_code, self._ipc_validation)
    try:
        ab = _read_batch_with_log_check(reader, self._on_log, self._external_config)
    except RpcError:
        _drain_stream(reader)
        raise

    # Extract updated state from metadata
    if ab.custom_metadata is not None:
        new_state = ab.custom_metadata.get(STATE_KEY)
        if new_state is not None:
            self._state_bytes = new_state

    # Strip state token from user-visible metadata
    user_cm = strip_keys(ab.custom_metadata, STATE_KEY)

    _drain_stream(reader)
    return AnnotatedBatch(batch=ab.batch, custom_metadata=user_cm)

__iter__

__iter__() -> Iterator[AnnotatedBatch]

Iterate over output batches from a producer stream.

Yields pre-loaded batches from init, then follows continuation tokens.

Source code in vgi_rpc/http/_client.py
def __iter__(self) -> Iterator[AnnotatedBatch]:
    """Iterate over output batches from a producer stream.

    Yields pre-loaded batches from init, then follows continuation tokens.
    """
    # Yield pre-loaded batches from init response
    yield from self._pending_batches
    self._pending_batches.clear()

    if self._finished:
        return

    # Follow continuation tokens
    if self._state_bytes is None:
        return

    reader: ValidatedReader | None = None
    try:
        reader = self._send_continuation(self._state_bytes)
        while True:
            try:
                batch, custom_metadata = reader.read_next_batch_with_custom_metadata()
            except StopIteration:
                break

            # Check for continuation token (zero-row batch with STATE_KEY)
            if batch.num_rows == 0 and custom_metadata is not None:
                token = custom_metadata.get(STATE_KEY)
                if token is not None:
                    if not isinstance(token, bytes):
                        raise TypeError(f"Expected bytes for state token, got {type(token).__name__}")
                    _drain_stream(reader)
                    reader = self._send_continuation(token)
                    continue

            # Dispatch log/error batches
            if _dispatch_log_or_error(batch, custom_metadata, self._on_log):
                continue

            resolved_batch, resolved_cm = resolve_external_location(
                batch, custom_metadata, self._external_config, self._on_log, reader.ipc_validation
            )
            yield AnnotatedBatch(batch=resolved_batch, custom_metadata=resolved_cm)
    except RpcError:
        if reader is not None:
            _drain_stream(reader)
        raise

close

close() -> None

Close the session (no-op for HTTP — stateless).

Source code in vgi_rpc/http/_client.py
def close(self) -> None:
    """Close the session (no-op for HTTP — stateless)."""

__enter__

__enter__() -> HttpStreamSession

Enter the context.

Source code in vgi_rpc/http/_client.py
def __enter__(self) -> HttpStreamSession:
    """Enter the context."""
    return self

__exit__

__exit__(
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: TracebackType | None,
) -> None

Exit the context.

Source code in vgi_rpc/http/_client.py
def __exit__(
    self,
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: TracebackType | None,
) -> None:
    """Exit the context."""
    self.close()

Testing

make_sync_client

make_sync_client(
    server: RpcServer,
    *,
    prefix: str = "/vgi",
    signing_key: bytes | None = None,
    max_stream_response_bytes: int | None = None,
    max_request_bytes: int | None = None,
    authenticate: (
        Callable[[Request], AuthContext] | None
    ) = None,
    default_headers: dict[str, str] | None = None,
    upload_url_provider: UploadUrlProvider | None = None,
    max_upload_bytes: int | None = None,
    otel_config: object | None = None,
    token_ttl: int = 3600
) -> _SyncTestClient

Create a synchronous test client for an RpcServer.

Uses falcon.testing.TestClient internally — no real HTTP server needed.

PARAMETER DESCRIPTION
server

The RpcServer to test.

TYPE: RpcServer

prefix

URL prefix for RPC endpoints (default /vgi).

TYPE: str DEFAULT: '/vgi'

signing_key

HMAC key for signing state tokens (see make_wsgi_app for details).

TYPE: bytes | None DEFAULT: None

max_stream_response_bytes

See make_wsgi_app.

TYPE: int | None DEFAULT: None

max_request_bytes

See make_wsgi_app.

TYPE: int | None DEFAULT: None

authenticate

See make_wsgi_app.

TYPE: Callable[[Request], AuthContext] | None DEFAULT: None

default_headers

Headers merged into every request (e.g. auth tokens).

TYPE: dict[str, str] | None DEFAULT: None

upload_url_provider

See make_wsgi_app.

TYPE: UploadUrlProvider | None DEFAULT: None

max_upload_bytes

See make_wsgi_app.

TYPE: int | None DEFAULT: None

otel_config

See make_wsgi_app.

TYPE: object | None DEFAULT: None

token_ttl

See make_wsgi_app.

TYPE: int DEFAULT: 3600

RETURNS DESCRIPTION
_SyncTestClient

A sync client that can be passed to http_connect(client=...).

Source code in vgi_rpc/http/_testing.py
def make_sync_client(
    server: RpcServer,
    *,
    prefix: str = "/vgi",
    signing_key: bytes | None = None,
    max_stream_response_bytes: int | None = None,
    max_request_bytes: int | None = None,
    authenticate: Callable[[falcon.Request], AuthContext] | None = None,
    default_headers: dict[str, str] | None = None,
    upload_url_provider: UploadUrlProvider | None = None,
    max_upload_bytes: int | None = None,
    otel_config: object | None = None,
    token_ttl: int = 3600,
) -> _SyncTestClient:
    """Create a synchronous test client for an RpcServer.

    Uses ``falcon.testing.TestClient`` internally — no real HTTP server needed.

    Args:
        server: The RpcServer to test.
        prefix: URL prefix for RPC endpoints (default ``/vgi``).
        signing_key: HMAC key for signing state tokens (see
            ``make_wsgi_app`` for details).
        max_stream_response_bytes: See ``make_wsgi_app``.
        max_request_bytes: See ``make_wsgi_app``.
        authenticate: See ``make_wsgi_app``.
        default_headers: Headers merged into every request (e.g. auth tokens).
        upload_url_provider: See ``make_wsgi_app``.
        max_upload_bytes: See ``make_wsgi_app``.
        otel_config: See ``make_wsgi_app``.
        token_ttl: See ``make_wsgi_app``.

    Returns:
        A sync client that can be passed to ``http_connect(client=...)``.

    """
    app = make_wsgi_app(
        server,
        prefix=prefix,
        signing_key=signing_key,
        max_stream_response_bytes=max_stream_response_bytes,
        max_request_bytes=max_request_bytes,
        authenticate=authenticate,
        upload_url_provider=upload_url_provider,
        max_upload_bytes=max_upload_bytes,
        otel_config=otel_config,
        token_ttl=token_ttl,
    )
    return _SyncTestClient(app, default_headers=default_headers)

Header Constants

MAX_REQUEST_BYTES_HEADER module-attribute

MAX_REQUEST_BYTES_HEADER = 'VGI-Max-Request-Bytes'

MAX_UPLOAD_BYTES_HEADER module-attribute

MAX_UPLOAD_BYTES_HEADER = 'VGI-Max-Upload-Bytes'

UPLOAD_URL_HEADER module-attribute

UPLOAD_URL_HEADER = 'VGI-Upload-URL-Support'