Skip to content

HTTP Transport

HTTP transport using Falcon (server) and httpx (client). Requires pip install vgi-rpc[http].

Quick Start

Server

Create a WSGI app and serve it with any WSGI server (waitress, gunicorn, etc.):

from vgi_rpc import RpcServer, make_wsgi_app

server = RpcServer(MyService, MyServiceImpl())
app = make_wsgi_app(server)
# serve `app` with waitress, gunicorn, etc.

Client

from vgi_rpc import http_connect

with http_connect(MyService, "http://localhost:8080") as proxy:
    result = proxy.echo(message="hello")  # proxy is typed as MyService

Testing (no real server)

make_sync_client wraps a Falcon TestClient so you can test the full HTTP stack in-process:

from vgi_rpc import RpcServer
from vgi_rpc.http import http_connect, make_sync_client

server = RpcServer(MyService, MyServiceImpl())
client = make_sync_client(server)

with http_connect(MyService, client=client) as proxy:
    assert proxy.echo(message="hello") == "hello"

Landing Page

By default, GET {prefix} (e.g. GET /vgi) returns an HTML landing page showing the vgi-rpc logo, the protocol name, server ID, and links. When the server has enable_describe=True, the landing page includes a link to the describe page.

To disable the landing page:

app = make_wsgi_app(server, enable_landing_page=False)

POST {prefix} returns 405 Method Not Allowed — it does not interfere with RPC routing.

Describe Page

When the server has enable_describe=True, GET {prefix}/describe (e.g. GET /vgi/describe) returns an HTML page listing all methods, their parameters (name, type, default), return types, docstrings, and method type badges (UNARY / STREAM). The __describe__ introspection method is filtered out.

Both enable_describe=True on the RpcServer and enable_describe_page=True (the default) on make_wsgi_app() are required.

To disable only the HTML page while keeping the __describe__ RPC method available:

app = make_wsgi_app(server, enable_describe_page=False)

Reserved path

When the describe page is active, the path {prefix}/describe is reserved for the HTML page. If your service has an RPC method literally named describe, you must set enable_describe_page=False.

Not-Found Page

By default, make_wsgi_app() installs a friendly HTML 404 page for any request that does not match an RPC route. If someone navigates to the server root or a random path in a browser, they see the vgi-rpc logo, the service protocol name, and a link to vgi-rpc.query.farm instead of a generic error.

This does not affect RPC clients — a request to a valid RPC route for a non-existent method still returns a machine-readable Arrow IPC error with HTTP 404.

To disable the page:

app = make_wsgi_app(server, enable_not_found_page=False)

API Reference

Server

make_wsgi_app

make_wsgi_app(
    server: RpcServer,
    *,
    prefix: str = "",
    signing_key: bytes | None = None,
    max_stream_response_bytes: int | None = None,
    max_stream_response_time: float | None = None,
    max_request_bytes: int | None = None,
    authenticate: (
        Callable[[Request], AuthContext] | None
    ) = None,
    cors_origins: str | Iterable[str] | None = None,
    cors_max_age: int | None = 7200,
    upload_url_provider: UploadUrlProvider | None = None,
    max_upload_bytes: int | None = None,
    otel_config: object | None = None,
    sentry_config: object | None = None,
    token_ttl: int = 3600,
    compression_level: int | None = 3,
    enable_not_found_page: bool = True,
    enable_landing_page: bool = True,
    enable_describe_page: bool = True,
    repo_url: str | None = None,
    oauth_resource_metadata: (
        OAuthResourceMetadata | None
    ) = None
) -> App[Request, Response]

Create a Falcon WSGI app that serves RPC requests over HTTP.

PARAMETER DESCRIPTION
server

The RpcServer instance to serve.

TYPE: RpcServer

prefix

URL prefix for all RPC endpoints (default "" — root).

TYPE: str DEFAULT: ''

signing_key

HMAC key for signing state tokens. When None (the default), a random 32-byte key is generated per process. This means state tokens issued by one worker are invalid in another — you must provide a shared key for multi-process deployments (e.g. gunicorn with multiple workers).

TYPE: bytes | None DEFAULT: None

max_stream_response_bytes

When set, producer stream responses may buffer multiple batches in a single HTTP response up to this size before emitting a continuation token. The client transparently resumes via POST /{method}/exchange. When None (the default) and max_stream_response_time is also None, each produce cycle emits one batch per HTTP response for incremental streaming.

TYPE: int | None DEFAULT: None

max_stream_response_time

When set, producer stream responses may buffer multiple batches up to this many seconds of wall time before emitting a continuation token. Can be combined with max_stream_response_bytes — the response breaks on whichever limit is reached first.

TYPE: float | None DEFAULT: None

max_request_bytes

When set, the value is advertised via the VGI-Max-Request-Bytes response header on every response (including OPTIONS). Clients can use http_capabilities() to discover this limit and decide whether to use external storage for large payloads. Advertisement only — no server-side enforcement. None (default) omits the header.

TYPE: int | None DEFAULT: None

authenticate

Optional callback that extracts an :class:AuthContext from a Falcon Request. When provided, every request is authenticated before dispatch. The callback should raise ValueError (bad credentials) or PermissionError (forbidden) on failure — these are mapped to HTTP 401. Other exceptions propagate as 500.

TYPE: Callable[[Request], AuthContext] | None DEFAULT: None

cors_origins

Allowed origins for CORS. Pass "*" to allow all origins, a single origin string like "https://example.com", or an iterable of origin strings. None (the default) disables CORS headers. Uses Falcon's built-in CORSMiddleware which also handles preflight OPTIONS requests automatically.

TYPE: str | Iterable[str] | None DEFAULT: None

cors_max_age

Value for the Access-Control-Max-Age header on preflight OPTIONS responses, in seconds. 7200 (2 hours) by default. None omits the header. Only effective when cors_origins is set.

TYPE: int | None DEFAULT: 7200

upload_url_provider

Optional provider for generating pre-signed upload URLs. When set, the __upload_url__/init endpoint is enabled and VGI-Upload-URL-Support: true is advertised on every response.

TYPE: UploadUrlProvider | None DEFAULT: None

max_upload_bytes

When set (and upload_url_provider is set), advertised via the VGI-Max-Upload-Bytes header. Informs clients of the maximum size they may upload to vended URLs. Advertisement only — no server-side enforcement.

TYPE: int | None DEFAULT: None

otel_config

Optional OtelConfig for OpenTelemetry instrumentation. When provided, instrument_server() is called and _OtelFalconMiddleware is prepended for W3C trace propagation. Requires pip install vgi-rpc[otel].

TYPE: object | None DEFAULT: None

sentry_config

Optional SentryConfig for Sentry error reporting. When provided, instrument_server_sentry() is called. Requires pip install vgi-rpc[sentry].

TYPE: object | None DEFAULT: None

token_ttl

Maximum age of stream state tokens in seconds. Tokens older than this are rejected with HTTP 400. Default is 3600 (1 hour). Set to 0 to disable expiry checking.

TYPE: int DEFAULT: 3600

compression_level

Zstandard compression level for HTTP request/ response bodies. 3 (the default) installs _CompressionMiddleware at level 3. Valid range is 1-22. None disables compression entirely.

TYPE: int | None DEFAULT: 3

enable_not_found_page

When True (the default), requests to paths that do not match any RPC route receive a friendly HTML 404 page. Set to False to use Falcon's default 404 behaviour instead.

TYPE: bool DEFAULT: True

enable_landing_page

When True (the default), GET {prefix} returns a friendly HTML landing page showing the protocol name, server ID, and links. Set to False to disable.

TYPE: bool DEFAULT: True

enable_describe_page

When True (the default) and the server has enable_describe=True, GET {prefix}/describe returns an HTML page listing all methods, parameters, and types. The path {prefix}/describe is reserved when active — an RPC method named describe would need the page disabled.

TYPE: bool DEFAULT: True

repo_url

Optional URL to the service's source repository (e.g. a GitHub URL). When provided, a "Source repository" link appears on the landing page and describe page.

TYPE: str | None DEFAULT: None

oauth_resource_metadata

Optional OAuthResourceMetadata for RFC 9728 OAuth discovery. When provided, serves /.well-known/oauth-protected-resource and adds WWW-Authenticate: Bearer resource_metadata="..." to 401 responses.

TYPE: OAuthResourceMetadata | None DEFAULT: None

RETURNS DESCRIPTION
App[Request, Response]

A Falcon application with routes for unary and stream RPC calls.

Source code in vgi_rpc/http/_server.py
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
2227
2228
2229
2230
2231
2232
2233
2234
2235
2236
2237
2238
2239
2240
2241
2242
2243
2244
2245
2246
2247
2248
2249
2250
2251
2252
2253
2254
2255
2256
2257
2258
2259
2260
2261
2262
2263
2264
2265
2266
2267
2268
2269
2270
2271
2272
2273
2274
2275
2276
2277
2278
2279
2280
2281
2282
2283
2284
2285
2286
2287
2288
2289
2290
2291
2292
2293
2294
2295
2296
2297
2298
2299
2300
2301
2302
2303
2304
2305
2306
2307
2308
2309
2310
2311
2312
2313
2314
2315
2316
2317
2318
2319
2320
2321
2322
2323
2324
2325
2326
2327
2328
2329
2330
2331
2332
2333
2334
2335
2336
2337
2338
def make_wsgi_app(
    server: RpcServer,
    *,
    prefix: str = "",
    signing_key: bytes | None = None,
    max_stream_response_bytes: int | None = None,
    max_stream_response_time: float | None = None,
    max_request_bytes: int | None = None,
    authenticate: Callable[[falcon.Request], AuthContext] | None = None,
    cors_origins: str | Iterable[str] | None = None,
    cors_max_age: int | None = 7200,
    upload_url_provider: UploadUrlProvider | None = None,
    max_upload_bytes: int | None = None,
    otel_config: object | None = None,
    sentry_config: object | None = None,
    token_ttl: int = 3600,
    compression_level: int | None = 3,
    enable_not_found_page: bool = True,
    enable_landing_page: bool = True,
    enable_describe_page: bool = True,
    repo_url: str | None = None,
    oauth_resource_metadata: OAuthResourceMetadata | None = None,
) -> falcon.App[falcon.Request, falcon.Response]:
    """Create a Falcon WSGI app that serves RPC requests over HTTP.

    Args:
        server: The RpcServer instance to serve.
        prefix: URL prefix for all RPC endpoints (default ``""`` — root).
        signing_key: HMAC key for signing state tokens.  When ``None``
            (the default), a random 32-byte key is generated **per process**.
            This means state tokens issued by one worker are invalid in
            another — you **must** provide a shared key for multi-process
            deployments (e.g. gunicorn with multiple workers).
        max_stream_response_bytes: When set, producer stream responses may
            buffer multiple batches in a single HTTP response up to this
            size before emitting a continuation token.  The client
            transparently resumes via ``POST /{method}/exchange``.
            When ``None`` (the default) and ``max_stream_response_time``
            is also ``None``, each produce cycle emits one batch per HTTP
            response for incremental streaming.
        max_stream_response_time: When set, producer stream responses may
            buffer multiple batches up to this many seconds of wall time
            before emitting a continuation token.  Can be combined with
            ``max_stream_response_bytes`` — the response breaks on
            whichever limit is reached first.
        max_request_bytes: When set, the value is advertised via the
            ``VGI-Max-Request-Bytes`` response header on every response
            (including OPTIONS).  Clients can use ``http_capabilities()``
            to discover this limit and decide whether to use external
            storage for large payloads.  Advertisement only — no
            server-side enforcement.  ``None`` (default) omits the header.
        authenticate: Optional callback that extracts an :class:`AuthContext`
            from a Falcon ``Request``.  When provided, every request is
            authenticated before dispatch.  The callback should raise
            ``ValueError`` (bad credentials) or ``PermissionError``
            (forbidden) on failure — these are mapped to HTTP 401.
            Other exceptions propagate as 500.
        cors_origins: Allowed origins for CORS.  Pass ``"*"`` to allow all
            origins, a single origin string like ``"https://example.com"``,
            or an iterable of origin strings.  ``None`` (the default)
            disables CORS headers.  Uses Falcon's built-in
            ``CORSMiddleware`` which also handles preflight OPTIONS
            requests automatically.
        cors_max_age: Value for the ``Access-Control-Max-Age`` header on
            preflight OPTIONS responses, in seconds.  ``7200`` (2 hours)
            by default.  ``None`` omits the header.  Only effective when
            ``cors_origins`` is set.
        upload_url_provider: Optional provider for generating pre-signed
            upload URLs.  When set, the ``__upload_url__/init`` endpoint
            is enabled and ``VGI-Upload-URL-Support: true`` is advertised
            on every response.
        max_upload_bytes: When set (and ``upload_url_provider`` is set),
            advertised via the ``VGI-Max-Upload-Bytes`` header.  Informs
            clients of the maximum size they may upload to vended URLs.
            Advertisement only — no server-side enforcement.
        otel_config: Optional ``OtelConfig`` for OpenTelemetry instrumentation.
            When provided, ``instrument_server()`` is called and
            ``_OtelFalconMiddleware`` is prepended for W3C trace propagation.
            Requires ``pip install vgi-rpc[otel]``.
        sentry_config: Optional ``SentryConfig`` for Sentry error reporting.
            When provided, ``instrument_server_sentry()`` is called.
            Requires ``pip install vgi-rpc[sentry]``.
        token_ttl: Maximum age of stream state tokens in seconds.  Tokens
            older than this are rejected with HTTP 400.  Default is 3600
            (1 hour).  Set to ``0`` to disable expiry checking.
        compression_level: Zstandard compression level for HTTP request/
            response bodies.  ``3`` (the default) installs
            ``_CompressionMiddleware`` at level 3.  Valid range is 1-22.
            ``None`` disables compression entirely.
        enable_not_found_page: When ``True`` (the default), requests to
            paths that do not match any RPC route receive a friendly HTML
            404 page.  Set to ``False`` to use Falcon's default 404
            behaviour instead.
        enable_landing_page: When ``True`` (the default), ``GET {prefix}``
            returns a friendly HTML landing page showing the protocol name,
            server ID, and links.  Set to ``False`` to disable.
        enable_describe_page: When ``True`` (the default) **and** the server
            has ``enable_describe=True``, ``GET {prefix}/describe`` returns
            an HTML page listing all methods, parameters, and types.  The
            path ``{prefix}/describe`` is reserved when active — an RPC
            method named ``describe`` would need the page disabled.
        repo_url: Optional URL to the service's source repository (e.g. a
            GitHub URL).  When provided, a "Source repository" link appears
            on the landing page and describe page.
        oauth_resource_metadata: Optional ``OAuthResourceMetadata`` for
            RFC 9728 OAuth discovery.  When provided, serves
            ``/.well-known/oauth-protected-resource`` and adds
            ``WWW-Authenticate: Bearer resource_metadata="..."`` to 401
            responses.

    Returns:
        A Falcon application with routes for unary and stream RPC calls.

    """
    if signing_key is None:
        warnings.warn(
            "No signing_key provided; generating a random per-process key. "
            "State tokens will be invalid across workers — pass a shared key "
            "for multi-process deployments.",
            stacklevel=2,
        )
        signing_key = os.urandom(32)
    # OpenTelemetry instrumentation (optional)
    if otel_config is not None:
        from vgi_rpc.otel import OtelConfig, _OtelFalconMiddleware, instrument_server

        if not isinstance(otel_config, OtelConfig):
            raise TypeError(f"otel_config must be an OtelConfig instance, got {type(otel_config).__name__}")
        instrument_server(server, otel_config)

    # Sentry error reporting (optional)
    if sentry_config is not None:
        from vgi_rpc.sentry import SentryConfig, instrument_server_sentry

        if not isinstance(sentry_config, SentryConfig):
            raise TypeError(f"sentry_config must be a SentryConfig instance, got {type(sentry_config).__name__}")
        instrument_server_sentry(server, sentry_config)

    app_handler = _HttpRpcApp(
        server,
        signing_key,
        max_stream_response_bytes,
        max_stream_response_time,
        max_request_bytes,
        upload_url_provider,
        max_upload_bytes,
        token_ttl,
    )
    middleware: list[Any] = [_DrainRequestMiddleware(), _RequestIdMiddleware()]

    # Compression middleware decompresses request bodies and compresses
    # responses — must come before auth so handlers read plaintext bodies.
    if compression_level is not None:
        middleware.append(_CompressionMiddleware(compression_level))

    # OTel middleware must come before auth so spans cover the full request
    if otel_config is not None:
        middleware.append(_OtelFalconMiddleware())

    # Always expose auth and request-id headers; capability headers are
    # appended conditionally below.
    cors_expose: list[str] = ["WWW-Authenticate", _REQUEST_ID_HEADER, "X-VGI-Content-Encoding", RPC_ERROR_HEADER]

    # Build capability headers
    capability_headers: dict[str, str] = {}
    if max_request_bytes is not None:
        capability_headers[MAX_REQUEST_BYTES_HEADER] = str(max_request_bytes)
        cors_expose.append(MAX_REQUEST_BYTES_HEADER)
    if upload_url_provider is not None:
        capability_headers[UPLOAD_URL_HEADER] = "true"
        cors_expose.append(UPLOAD_URL_HEADER)
        if max_upload_bytes is not None:
            capability_headers[MAX_UPLOAD_BYTES_HEADER] = str(max_upload_bytes)
            cors_expose.append(MAX_UPLOAD_BYTES_HEADER)

    # OAuth resource metadata (RFC 9728)
    from vgi_rpc.http._oauth import OAuthResourceMetadata as _OAuthMeta
    from vgi_rpc.http._oauth import _build_www_authenticate

    www_authenticate: str | None = None
    _validated_oauth_metadata: _OAuthMeta | None = None
    if oauth_resource_metadata is not None:
        if not isinstance(oauth_resource_metadata, _OAuthMeta):
            raise TypeError(
                f"oauth_resource_metadata must be an OAuthResourceMetadata instance, "
                f"got {type(oauth_resource_metadata).__name__}"
            )
        _validated_oauth_metadata = oauth_resource_metadata
        www_authenticate = _build_www_authenticate(_validated_oauth_metadata, prefix)

    if cors_origins is not None:
        cors_kwargs: dict[str, Any] = {
            "allow_origins": cors_origins,
            "expose_headers": cors_expose,
        }
        middleware.append(falcon.CORSMiddleware(**cors_kwargs))
        if cors_max_age is not None:
            middleware.append(_CorsMaxAgeMiddleware(cors_max_age))
    # OAuth PKCE browser flow — only when authenticate + OAuth metadata + client_id
    _pkce_active = False
    _pkce_user_info_html: str | None = None
    _exempt_prefixes: tuple[str, ...] = ()
    if (
        authenticate is not None
        and _validated_oauth_metadata is not None
        and _validated_oauth_metadata.client_id is not None
    ):
        from urllib.parse import urlparse as _urlparse

        from vgi_rpc.http._bearer import chain_authenticate
        from vgi_rpc.http._oauth_pkce import (
            _create_oidc_discovery,
            _derive_session_key,
            _OAuthCallbackResource,
            _OAuthLogoutResource,
            _OAuthPkceMiddleware,
            build_user_info_html,
            make_cookie_authenticate,
        )

        _pkce_issuer = _validated_oauth_metadata.authorization_servers[0]
        _pkce_oidc_discovery = _create_oidc_discovery(_pkce_issuer)
        _pkce_session_key = _derive_session_key(signing_key)
        _pkce_resource_parsed = _urlparse(_validated_oauth_metadata.resource)
        _pkce_secure = _pkce_resource_parsed.scheme == "https"
        _pkce_redirect_uri = f"{_pkce_resource_parsed.scheme}://{_pkce_resource_parsed.netloc}{prefix}/_oauth/callback"

        if not _pkce_secure and _pkce_resource_parsed.hostname not in ("localhost", "127.0.0.1", "::1"):
            _logger.warning(
                "OAuth PKCE is configured without HTTPS (%s) — cookies will not be Secure. "
                "This is acceptable for local development but not for production.",
                _validated_oauth_metadata.resource,
            )

        # Wrap authenticate to also accept tokens from a cookie
        _pkce_cookie_auth = make_cookie_authenticate(authenticate)
        authenticate = chain_authenticate(authenticate, _pkce_cookie_auth)

        _pkce_client_id: str = _validated_oauth_metadata.client_id
        _pkce_client_secret = _validated_oauth_metadata.client_secret
        _pkce_use_id_token = _validated_oauth_metadata.use_id_token_as_bearer
        _exempt_prefixes = (f"{prefix}/_oauth/",)
        _pkce_active = True
        _pkce_user_info_html = build_user_info_html(prefix)

    if authenticate is not None:
        on_auth_failure: Callable[[str | None, str], None] | None = None
        if otel_config is not None:
            from vgi_rpc.otel import OtelConfig as _OtelCfg
            from vgi_rpc.otel import make_auth_failure_counter

            assert isinstance(otel_config, _OtelCfg)  # validated above
            on_auth_failure = make_auth_failure_counter(otel_config, server.protocol_name)
        middleware.append(
            _AuthMiddleware(
                authenticate,
                www_authenticate=www_authenticate,
                on_auth_failure=on_auth_failure,
                exempt_prefixes=_exempt_prefixes,
            )
        )
        if _pkce_active:
            middleware.append(
                _OAuthPkceMiddleware(
                    session_key=_pkce_session_key,
                    oidc_discovery=_pkce_oidc_discovery,
                    client_id=_pkce_client_id,
                    prefix=prefix,
                    secure_cookie=_pkce_secure,
                    redirect_uri=_pkce_redirect_uri,
                )
            )
    if capability_headers:
        middleware.append(_CapabilitiesMiddleware(capability_headers))
    app: falcon.App[falcon.Request, falcon.Response] = falcon.App(middleware=middleware or None)
    app.set_error_serializer(_error_serializer)

    # OAuth well-known endpoint (must be before RPC routes)
    if _validated_oauth_metadata is not None:
        from vgi_rpc.http._oauth import _OAuthResourceMetadataResource

        well_known = _OAuthResourceMetadataResource(_validated_oauth_metadata)
        app.add_route("/.well-known/oauth-protected-resource", well_known)
        if prefix and prefix != "/":
            app.add_route(f"/.well-known/oauth-protected-resource{prefix}", well_known)

    app.add_route(f"{prefix}/{{method}}", _RpcResource(app_handler))
    app.add_route(f"{prefix}/{{method}}/init", _StreamInitResource(app_handler))
    app.add_route(f"{prefix}/{{method}}/exchange", _ExchangeResource(app_handler))
    if upload_url_provider is not None:
        app.add_route(f"{prefix}/__upload_url__/init", _UploadUrlResource(app_handler))

    # OAuth PKCE callback and logout routes (must be before not-found sink)
    if _pkce_active:
        app.add_route(
            f"{prefix}/_oauth/callback",
            _OAuthCallbackResource(
                session_key=_pkce_session_key,
                oidc_discovery=_pkce_oidc_discovery,
                client_id=_pkce_client_id,
                client_secret=_pkce_client_secret,
                use_id_token=_pkce_use_id_token,
                prefix=prefix,
                secure_cookie=_pkce_secure,
                redirect_uri=_pkce_redirect_uri,
            ),
        )
        app.add_route(f"{prefix}/_oauth/logout", _OAuthLogoutResource(prefix, _pkce_secure))

    # Describe page — GET {prefix}/describe (requires both flags and server support)
    describe_page_active = enable_describe_page and server.describe_enabled
    if describe_page_active:
        describe_html = _build_describe_html(server, prefix, repo_url)
        if _pkce_user_info_html:
            describe_html = describe_html.replace(b"</body>", _pkce_user_info_html.encode() + b"\n</body>")
        app.add_route(f"{prefix}/describe", _DescribePageResource(describe_html))

    # Landing page — GET {prefix}
    if enable_landing_page:
        describe_path = f"{prefix}/describe" if describe_page_active else None
        landing_body = _build_landing_html(prefix, server.protocol_name, server.server_id, describe_path, repo_url)
        if _pkce_user_info_html:
            landing_body = landing_body.replace(b"</body>", _pkce_user_info_html.encode() + b"\n</body>")
        app.add_route(prefix or "/", _LandingPageResource(landing_body))

    if enable_not_found_page:
        app.add_sink(_make_not_found_sink(prefix, server.protocol_name))

    _logger.info(
        "WSGI app created for %s (server_id=%s, prefix=%s, auth=%s)",
        server.protocol_name,
        server.server_id,
        prefix,
        "enabled" if authenticate is not None else "disabled",
        extra={
            "server_id": server.server_id,
            "protocol": server.protocol_name,
            "prefix": prefix,
            "auth_enabled": authenticate is not None,
        },
    )

    return app

Client

http_connect

http_connect(
    protocol: type[P],
    base_url: str | None = None,
    *,
    prefix: str | None = None,
    on_log: Callable[[Message], None] | None = None,
    client: Client | _SyncTestClient | None = None,
    external_location: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = FULL,
    retry: HttpRetryConfig | None = None,
    compression_level: int | None = 3
) -> Iterator[P]

Connect to an HTTP RPC server and yield a typed proxy.

PARAMETER DESCRIPTION
protocol

The Protocol class defining the RPC interface.

TYPE: type[P]

base_url

Base URL of the server (e.g. http://localhost:8000). Required when client is None; ignored when a pre-built client is provided. The internally-created client follows redirects transparently.

TYPE: str | None DEFAULT: None

prefix

URL prefix matching the server's prefix. When None (the default), auto-detected from a _SyncTestClient's .prefix attribute, or "" for other clients.

TYPE: str | None DEFAULT: None

on_log

Optional callback for log messages from the server.

TYPE: Callable[[Message], None] | None DEFAULT: None

client

Optional HTTP client — httpx.Client for production, or a _SyncTestClient from make_sync_client() for testing.

TYPE: Client | _SyncTestClient | None DEFAULT: None

external_location

Optional ExternalLocationConfig for resolving and producing externalized batches.

TYPE: ExternalLocationConfig | None DEFAULT: None

ipc_validation

Validation level for incoming IPC batches.

TYPE: IpcValidation DEFAULT: FULL

retry

Optional retry configuration for transient HTTP failures. When None (the default), no retries are attempted.

TYPE: HttpRetryConfig | None DEFAULT: None

compression_level

Zstandard compression level for request bodies. 3 (the default) compresses requests and adds Content-Encoding: zstd. None disables request compression (httpx still auto-decompresses server responses).

TYPE: int | None DEFAULT: 3

YIELDS DESCRIPTION
P

A typed RPC proxy supporting all methods defined on protocol.

RAISES DESCRIPTION
ValueError

If base_url is None and client is None.

Source code in vgi_rpc/http/_client.py
@contextlib.contextmanager
def http_connect[P](
    protocol: type[P],
    base_url: str | None = None,
    *,
    prefix: str | None = None,
    on_log: Callable[[Message], None] | None = None,
    client: httpx.Client | _SyncTestClient | None = None,
    external_location: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = IpcValidation.FULL,
    retry: HttpRetryConfig | None = None,
    compression_level: int | None = 3,
) -> Iterator[P]:
    """Connect to an HTTP RPC server and yield a typed proxy.

    Args:
        protocol: The Protocol class defining the RPC interface.
        base_url: Base URL of the server (e.g. ``http://localhost:8000``).
            Required when *client* is ``None``; ignored when a pre-built
            *client* is provided.  The internally-created client follows
            redirects transparently.
        prefix: URL prefix matching the server's prefix.  When ``None``
            (the default), auto-detected from a ``_SyncTestClient``'s
            ``.prefix`` attribute, or ``""`` for other clients.
        on_log: Optional callback for log messages from the server.
        client: Optional HTTP client — ``httpx.Client`` for production,
            or a ``_SyncTestClient`` from ``make_sync_client()`` for testing.
        external_location: Optional ExternalLocationConfig for
            resolving and producing externalized batches.
        ipc_validation: Validation level for incoming IPC batches.
        retry: Optional retry configuration for transient HTTP failures.
            When ``None`` (the default), no retries are attempted.
        compression_level: Zstandard compression level for request bodies.
            ``3`` (the default) compresses requests and adds
            ``Content-Encoding: zstd``.  ``None`` disables request
            compression (httpx still auto-decompresses server responses).

    Yields:
        A typed RPC proxy supporting all methods defined on *protocol*.

    Raises:
        ValueError: If *base_url* is ``None`` and *client* is ``None``.

    """
    own_client = client is None
    if client is None:
        if base_url is None:
            raise ValueError("base_url is required when client is not provided")
        client = httpx.Client(base_url=base_url, follow_redirects=True)

    # Auto-detect prefix from _SyncTestClient when not explicitly provided
    url_prefix = getattr(client, "prefix", "") if prefix is None else prefix
    try:
        yield cast(
            P,
            _HttpProxy(
                protocol,
                client,
                url_prefix,
                on_log,
                external_config=external_location,
                ipc_validation=ipc_validation,
                retry_config=retry,
                compression_level=compression_level,
            ),
        )
    finally:
        if own_client:
            client.close()

http_introspect

http_introspect(
    base_url: str | None = None,
    *,
    prefix: str | None = None,
    client: Client | _SyncTestClient | None = None,
    ipc_validation: IpcValidation = FULL,
    retry: HttpRetryConfig | None = None
) -> ServiceDescription

Send a __describe__ request over HTTP and return a ServiceDescription.

PARAMETER DESCRIPTION
base_url

Base URL of the server (e.g. http://localhost:8000). Required when client is None.

TYPE: str | None DEFAULT: None

prefix

URL prefix matching the server's prefix. None auto-detects from _SyncTestClient.

TYPE: str | None DEFAULT: None

client

Optional HTTP client (httpx.Client or _SyncTestClient).

TYPE: Client | _SyncTestClient | None DEFAULT: None

ipc_validation

Validation level for incoming IPC batches.

TYPE: IpcValidation DEFAULT: FULL

retry

Optional retry configuration for transient HTTP failures.

TYPE: HttpRetryConfig | None DEFAULT: None

RETURNS DESCRIPTION
ServiceDescription

A ServiceDescription with all method metadata.

RAISES DESCRIPTION
RpcError

If the server does not support introspection or returns an error.

ValueError

If base_url is None and client is None.

Source code in vgi_rpc/http/_client.py
def http_introspect(
    base_url: str | None = None,
    *,
    prefix: str | None = None,
    client: httpx.Client | _SyncTestClient | None = None,
    ipc_validation: IpcValidation = IpcValidation.FULL,
    retry: HttpRetryConfig | None = None,
) -> ServiceDescription:
    """Send a ``__describe__`` request over HTTP and return a ``ServiceDescription``.

    Args:
        base_url: Base URL of the server (e.g. ``http://localhost:8000``).
            Required when *client* is ``None``.
        prefix: URL prefix matching the server's prefix.  ``None``
            auto-detects from ``_SyncTestClient``.
        client: Optional HTTP client (``httpx.Client`` or ``_SyncTestClient``).
        ipc_validation: Validation level for incoming IPC batches.
        retry: Optional retry configuration for transient HTTP failures.

    Returns:
        A ``ServiceDescription`` with all method metadata.

    Raises:
        RpcError: If the server does not support introspection or returns
            an error.
        ValueError: If *base_url* is ``None`` and *client* is ``None``.

    """
    from vgi_rpc.introspect import DESCRIBE_METHOD_NAME, parse_describe_batch

    own_client = client is None
    if client is None:
        if base_url is None:
            raise ValueError("base_url is required when client is not provided")
        client = httpx.Client(base_url=base_url, follow_redirects=True)
    if prefix is None:
        prefix = getattr(client, "prefix", "")

    try:
        # Build a minimal request: empty params with __describe__ method name
        req_buf = BytesIO()
        request_metadata = pa.KeyValueMetadata(
            {
                b"vgi_rpc.method": DESCRIBE_METHOD_NAME.encode(),
                b"vgi_rpc.request_version": b"1",
            }
        )
        with ipc.new_stream(req_buf, _EMPTY_SCHEMA) as writer:
            writer.write_batch(
                pa.RecordBatch.from_pydict({}, schema=_EMPTY_SCHEMA),
                custom_metadata=request_metadata,
            )

        resp = _post_with_retry(
            client,
            f"{prefix}/{DESCRIBE_METHOD_NAME}",
            content=req_buf.getvalue(),
            headers={"Content-Type": _ARROW_CONTENT_TYPE},
            config=retry,
        )

        reader = _open_response_stream(resp.content, resp.status_code, ipc_validation)
        # Skip log batches
        while True:
            batch, custom_metadata = reader.read_next_batch_with_custom_metadata()
            if not _dispatch_log_or_error(batch, custom_metadata):
                break
        _drain_stream(reader)

        return parse_describe_batch(batch, custom_metadata)
    finally:
        if own_client:
            client.close()

http_capabilities

http_capabilities(
    base_url: str | None = None,
    *,
    prefix: str | None = None,
    client: Client | _SyncTestClient | None = None,
    retry: HttpRetryConfig | None = None
) -> HttpServerCapabilities

Discover server capabilities via an OPTIONS request.

Sends OPTIONS {prefix}/__capabilities__ and reads capability headers (VGI-Max-Request-Bytes, VGI-Upload-URL-Support, VGI-Max-Upload-Bytes) from the response.

PARAMETER DESCRIPTION
base_url

Base URL of the server (e.g. http://localhost:8000). Required when client is None.

TYPE: str | None DEFAULT: None

prefix

URL prefix matching the server's prefix. None auto-detects from _SyncTestClient.

TYPE: str | None DEFAULT: None

client

Optional HTTP client (httpx.Client or _SyncTestClient).

TYPE: Client | _SyncTestClient | None DEFAULT: None

retry

Optional retry configuration for transient HTTP failures.

TYPE: HttpRetryConfig | None DEFAULT: None

RETURNS DESCRIPTION
HttpServerCapabilities

An HttpServerCapabilities with discovered values.

RAISES DESCRIPTION
ValueError

If base_url is None and client is None.

Source code in vgi_rpc/http/_client.py
def http_capabilities(
    base_url: str | None = None,
    *,
    prefix: str | None = None,
    client: httpx.Client | _SyncTestClient | None = None,
    retry: HttpRetryConfig | None = None,
) -> HttpServerCapabilities:
    """Discover server capabilities via an OPTIONS request.

    Sends ``OPTIONS {prefix}/__capabilities__`` and reads capability
    headers (``VGI-Max-Request-Bytes``, ``VGI-Upload-URL-Support``,
    ``VGI-Max-Upload-Bytes``) from the response.

    Args:
        base_url: Base URL of the server (e.g. ``http://localhost:8000``).
            Required when *client* is ``None``.
        prefix: URL prefix matching the server's prefix.  ``None``
            auto-detects from ``_SyncTestClient``.
        client: Optional HTTP client (``httpx.Client`` or ``_SyncTestClient``).
        retry: Optional retry configuration for transient HTTP failures.

    Returns:
        An ``HttpServerCapabilities`` with discovered values.

    Raises:
        ValueError: If *base_url* is ``None`` and *client* is ``None``.

    """
    own_client = client is None
    if client is None:
        if base_url is None:
            raise ValueError("base_url is required when client is not provided")
        client = httpx.Client(base_url=base_url, follow_redirects=True)
    if prefix is None:
        prefix = getattr(client, "prefix", "")

    try:
        url = f"{prefix}/__capabilities__"
        resp = _options_with_retry(client, url, config=retry)
        headers = resp.headers

        max_req: int | None = None
        raw = headers.get(MAX_REQUEST_BYTES_HEADER) or headers.get(MAX_REQUEST_BYTES_HEADER.lower())
        if raw is not None:
            with contextlib.suppress(ValueError):
                max_req = int(raw)

        upload_raw = headers.get(UPLOAD_URL_HEADER) or headers.get(UPLOAD_URL_HEADER.lower())
        upload_support = upload_raw == "true" if upload_raw is not None else False

        max_upload: int | None = None
        upload_bytes_raw = headers.get(MAX_UPLOAD_BYTES_HEADER) or headers.get(MAX_UPLOAD_BYTES_HEADER.lower())
        if upload_bytes_raw is not None:
            with contextlib.suppress(ValueError):
                max_upload = int(upload_bytes_raw)

        return HttpServerCapabilities(
            max_request_bytes=max_req,
            upload_url_support=upload_support,
            max_upload_bytes=max_upload,
        )
    finally:
        if own_client:
            client.close()

request_upload_urls

request_upload_urls(
    base_url: str | None = None,
    *,
    count: int = 1,
    prefix: str | None = None,
    client: Client | _SyncTestClient | None = None,
    retry: HttpRetryConfig | None = None
) -> list[UploadUrl]

Request pre-signed upload URLs from the server's __upload_url__ endpoint.

The server must have been configured with an upload_url_provider in make_wsgi_app().

PARAMETER DESCRIPTION
base_url

Base URL of the server (e.g. http://localhost:8000). Required when client is None.

TYPE: str | None DEFAULT: None

count

Number of upload URLs to request (default 1, max 100).

TYPE: int DEFAULT: 1

prefix

URL prefix matching the server's prefix. None auto-detects from _SyncTestClient.

TYPE: str | None DEFAULT: None

client

Optional HTTP client (httpx.Client or _SyncTestClient).

TYPE: Client | _SyncTestClient | None DEFAULT: None

retry

Optional retry configuration for transient HTTP failures.

TYPE: HttpRetryConfig | None DEFAULT: None

RETURNS DESCRIPTION
list[UploadUrl]

A list of UploadUrl objects with pre-signed PUT and GET URLs.

RAISES DESCRIPTION
RpcError

If the server does not support upload URLs (404) or returns an error.

ValueError

If base_url is None and client is None.

Source code in vgi_rpc/http/_client.py
def request_upload_urls(
    base_url: str | None = None,
    *,
    count: int = 1,
    prefix: str | None = None,
    client: httpx.Client | _SyncTestClient | None = None,
    retry: HttpRetryConfig | None = None,
) -> list[UploadUrl]:
    """Request pre-signed upload URLs from the server's ``__upload_url__`` endpoint.

    The server must have been configured with an ``upload_url_provider``
    in ``make_wsgi_app()``.

    Args:
        base_url: Base URL of the server (e.g. ``http://localhost:8000``).
            Required when *client* is ``None``.
        count: Number of upload URLs to request (default 1, max 100).
        prefix: URL prefix matching the server's prefix.  ``None``
            auto-detects from ``_SyncTestClient``.
        client: Optional HTTP client (``httpx.Client`` or ``_SyncTestClient``).
        retry: Optional retry configuration for transient HTTP failures.

    Returns:
        A list of ``UploadUrl`` objects with pre-signed PUT and GET URLs.

    Raises:
        RpcError: If the server does not support upload URLs (404) or
            returns an error.
        ValueError: If *base_url* is ``None`` and *client* is ``None``.

    """
    own_client = client is None
    if client is None:
        if base_url is None:
            raise ValueError("base_url is required when client is not provided")
        client = httpx.Client(base_url=base_url, follow_redirects=True)
    if prefix is None:
        prefix = getattr(client, "prefix", "")

    try:
        # Build request IPC with standard wire protocol metadata
        req_buf = BytesIO()
        _write_request(req_buf, _UPLOAD_URL_METHOD, _UPLOAD_URL_PARAMS_SCHEMA, {"count": count})

        resp = _post_with_retry(
            client,
            f"{prefix}/__upload_url__/init",
            content=req_buf.getvalue(),
            headers={"Content-Type": _ARROW_CONTENT_TYPE},
            config=retry,
        )

        # Without an upload_url_provider the route doesn't exist and the
        # request falls through to _StreamInitResource → 404.
        if resp.status_code == HTTPStatus.NOT_FOUND:
            raise RpcError("NotSupported", "Server does not support upload URLs", "")

        reader = _open_response_stream(resp.content, resp.status_code)
        urls: list[UploadUrl] = []
        try:
            while True:
                try:
                    batch, custom_metadata = reader.read_next_batch_with_custom_metadata()
                except StopIteration:
                    break

                if _dispatch_log_or_error(batch, custom_metadata):
                    continue

                for i in range(batch.num_rows):
                    upload_url = batch.column("upload_url")[i].as_py()
                    download_url = batch.column("download_url")[i].as_py()
                    expires_at = batch.column("expires_at")[i].as_py()
                    urls.append(UploadUrl(upload_url=upload_url, download_url=download_url, expires_at=expires_at))
        except RpcError:
            _drain_stream(reader)
            raise
        _drain_stream(reader)
        return urls
    finally:
        if own_client:
            client.close()

Capabilities

HttpServerCapabilities dataclass

HttpServerCapabilities(
    max_request_bytes: int | None = None,
    upload_url_support: bool = False,
    max_upload_bytes: int | None = None,
)

Capabilities advertised by an HTTP RPC server.

ATTRIBUTE DESCRIPTION
max_request_bytes

Maximum request body size the server advertises, or None if the server does not advertise a limit. Advertisement only -- no server-side enforcement.

TYPE: int | None

upload_url_support

Whether the server supports the __upload_url__ endpoint for client-side uploads.

TYPE: bool

max_upload_bytes

Maximum upload size the server advertises for client-vended URLs, or None if not advertised. Advertisement only -- no server-side enforcement.

TYPE: int | None

Stream Session

HttpStreamSession

HttpStreamSession(
    client: Client | _SyncTestClient,
    url_prefix: str,
    method: str,
    state_bytes: bytes | None,
    output_schema: Schema,
    on_log: Callable[[Message], None] | None = None,
    *,
    external_config: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = FULL,
    pending_batches: list[AnnotatedBatch] | None = None,
    finished: bool = False,
    header: object | None = None,
    retry_config: HttpRetryConfig | None = None,
    compression_level: int | None = None
)

Client-side handle for a stream over HTTP (both producer and exchange patterns).

For producer streams, use __iter__() — yields batches from batched responses and follows continuation tokens transparently. For exchange streams, use exchange() — sends an input batch and receives an output batch.

Supports context manager protocol for convenience.

Initialize with HTTP client, method details, and initial state.

Source code in vgi_rpc/http/_client.py
def __init__(
    self,
    client: httpx.Client | _SyncTestClient,
    url_prefix: str,
    method: str,
    state_bytes: bytes | None,
    output_schema: pa.Schema,
    on_log: Callable[[Message], None] | None = None,
    *,
    external_config: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = IpcValidation.FULL,
    pending_batches: list[AnnotatedBatch] | None = None,
    finished: bool = False,
    header: object | None = None,
    retry_config: HttpRetryConfig | None = None,
    compression_level: int | None = None,
) -> None:
    """Initialize with HTTP client, method details, and initial state."""
    self._client = client
    self._url_prefix = url_prefix
    self._method = method
    self._state_bytes = state_bytes
    self._output_schema = output_schema
    self._on_log = on_log
    self._external_config = external_config
    self._ipc_validation = ipc_validation
    self._pending_batches: list[AnnotatedBatch] = pending_batches or []
    self._finished = finished
    self._header = header
    self._retry_config = retry_config
    self._compression_level = compression_level

header property

header: object | None

The stream header, or None if the stream has no header.

typed_header

typed_header(header_type: type[H]) -> H

Return the stream header narrowed to the expected type.

PARAMETER DESCRIPTION
header_type

The expected header dataclass type.

TYPE: type[H]

RETURNS DESCRIPTION
H

The header, typed as header_type.

RAISES DESCRIPTION
TypeError

If the header is None or not an instance of header_type.

Source code in vgi_rpc/http/_client.py
def typed_header[H: ArrowSerializableDataclass](self, header_type: type[H]) -> H:
    """Return the stream header narrowed to the expected type.

    Args:
        header_type: The expected header dataclass type.

    Returns:
        The header, typed as *header_type*.

    Raises:
        TypeError: If the header is ``None`` or not an instance of
            *header_type*.

    """
    if self._header is None:
        raise TypeError(f"Stream has no header (expected {header_type.__name__})")
    if not isinstance(self._header, header_type):
        raise TypeError(f"Header type mismatch: expected {header_type.__name__}, got {type(self._header).__name__}")
    return self._header

exchange

exchange(input_batch: AnnotatedBatch) -> AnnotatedBatch

Send an input batch and receive the output batch.

PARAMETER DESCRIPTION
input_batch

The input batch to send.

TYPE: AnnotatedBatch

RETURNS DESCRIPTION
AnnotatedBatch

The output batch from the server.

RAISES DESCRIPTION
RpcError

If the server reports an error or the stream has finished.

Source code in vgi_rpc/http/_client.py
def exchange(self, input_batch: AnnotatedBatch) -> AnnotatedBatch:
    """Send an input batch and receive the output batch.

    Args:
        input_batch: The input batch to send.

    Returns:
        The output batch from the server.

    Raises:
        RpcError: If the server reports an error or the stream has finished.

    """
    if self._state_bytes is None:
        raise RpcError("ProtocolError", "Stream has finished — no state token available", "")

    batch_to_write = input_batch.batch
    cm_to_write = input_batch.custom_metadata

    # Client-side externalization for large inputs
    if self._external_config is not None:
        batch_to_write, cm_to_write = maybe_externalize_batch(batch_to_write, cm_to_write, self._external_config)

    # Write input batch with state in metadata
    req_buf = BytesIO()
    state_md = pa.KeyValueMetadata({STATE_KEY: self._state_bytes})
    merged = merge_metadata(cm_to_write, state_md)
    with ipc.new_stream(req_buf, batch_to_write.schema) as writer:
        writer.write_batch(batch_to_write, custom_metadata=merged)

    if wire_http_logger.isEnabledFor(logging.DEBUG):
        wire_http_logger.debug(
            "HTTP stream exchange: method=%s, input=%s",
            self._method,
            fmt_batch(batch_to_write),
        )
    # Exchange calls are NOT retried: the server's process() method may
    # have side effects, and a proxy 502 after server processing would
    # cause duplicate execution.  Only init/unary/continuation are retried.
    resp = self._client.post(
        f"{self._url_prefix}/{self._method}/exchange",
        content=self._prepare_body(req_buf.getvalue()),
        headers=self._build_headers(),
    )
    if wire_http_logger.isEnabledFor(logging.DEBUG):
        wire_http_logger.debug(
            "HTTP stream exchange response: method=%s, status=%d, size=%d",
            self._method,
            resp.status_code,
            len(resp.content),
        )

    # Read response — log batches + data batch with state
    reader = _open_response_stream(resp.content, resp.status_code, self._ipc_validation)
    try:
        ab = _read_batch_with_log_check(reader, self._on_log, self._external_config)
    except RpcError:
        _drain_stream(reader)
        raise

    # Extract updated state from metadata
    if ab.custom_metadata is not None:
        new_state = ab.custom_metadata.get(STATE_KEY)
        if new_state is not None:
            self._state_bytes = new_state

    # Strip state token from user-visible metadata
    user_cm = strip_keys(ab.custom_metadata, STATE_KEY)

    _drain_stream(reader)
    return AnnotatedBatch(batch=ab.batch, custom_metadata=user_cm)

__iter__

__iter__() -> Iterator[AnnotatedBatch]

Iterate over output batches from a producer stream.

Yields pre-loaded batches from init, then follows continuation tokens.

Source code in vgi_rpc/http/_client.py
def __iter__(self) -> Iterator[AnnotatedBatch]:
    """Iterate over output batches from a producer stream.

    Yields pre-loaded batches from init, then follows continuation tokens.
    """
    # Yield pre-loaded batches from init response
    yield from self._pending_batches
    self._pending_batches.clear()

    if self._finished:
        return

    # Follow continuation tokens
    if self._state_bytes is None:
        return

    reader: ValidatedReader | None = None
    try:
        reader = self._send_continuation(self._state_bytes)
        while True:
            try:
                batch, custom_metadata = reader.read_next_batch_with_custom_metadata()
            except StopIteration:
                break

            # Check for continuation token (zero-row batch with STATE_KEY)
            if batch.num_rows == 0 and custom_metadata is not None:
                token = custom_metadata.get(STATE_KEY)
                if token is not None:
                    if not isinstance(token, bytes):
                        raise TypeError(f"Expected bytes for state token, got {type(token).__name__}")
                    _drain_stream(reader)
                    reader = self._send_continuation(token)
                    continue

            # Dispatch log/error batches
            if _dispatch_log_or_error(batch, custom_metadata, self._on_log):
                continue

            resolved_batch, resolved_cm = resolve_external_location(
                batch, custom_metadata, self._external_config, self._on_log, reader.ipc_validation
            )
            yield AnnotatedBatch(batch=resolved_batch, custom_metadata=resolved_cm)
    except RpcError:
        if reader is not None:
            _drain_stream(reader)
        raise

close

close() -> None

Close the session (no-op for HTTP — stateless).

Source code in vgi_rpc/http/_client.py
def close(self) -> None:
    """Close the session (no-op for HTTP — stateless)."""

__enter__

__enter__() -> HttpStreamSession

Enter the context.

Source code in vgi_rpc/http/_client.py
def __enter__(self) -> HttpStreamSession:
    """Enter the context."""
    return self

__exit__

__exit__(
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: TracebackType | None,
) -> None

Exit the context.

Source code in vgi_rpc/http/_client.py
def __exit__(
    self,
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: TracebackType | None,
) -> None:
    """Exit the context."""
    self.close()

Testing

make_sync_client

make_sync_client(
    server: RpcServer,
    *,
    prefix: str = "",
    signing_key: bytes | None = None,
    max_stream_response_bytes: int | None = None,
    max_request_bytes: int | None = None,
    authenticate: (
        Callable[[Request], AuthContext] | None
    ) = None,
    default_headers: dict[str, str] | None = None,
    upload_url_provider: UploadUrlProvider | None = None,
    max_upload_bytes: int | None = None,
    otel_config: object | None = None,
    sentry_config: object | None = None,
    token_ttl: int = 3600,
    compression_level: int | None = 3,
    enable_not_found_page: bool = True,
    enable_landing_page: bool = True,
    enable_describe_page: bool = True,
    repo_url: str | None = None,
    oauth_resource_metadata: (
        OAuthResourceMetadata | None
    ) = None
) -> _SyncTestClient

Create a synchronous test client for an RpcServer.

Uses falcon.testing.TestClient internally — no real HTTP server needed.

PARAMETER DESCRIPTION
server

The RpcServer to test.

TYPE: RpcServer

prefix

URL prefix for RPC endpoints (default "" — root).

TYPE: str DEFAULT: ''

signing_key

HMAC key for signing state tokens (see make_wsgi_app for details).

TYPE: bytes | None DEFAULT: None

max_stream_response_bytes

See make_wsgi_app.

TYPE: int | None DEFAULT: None

max_request_bytes

See make_wsgi_app.

TYPE: int | None DEFAULT: None

authenticate

See make_wsgi_app.

TYPE: Callable[[Request], AuthContext] | None DEFAULT: None

default_headers

Headers merged into every request (e.g. auth tokens).

TYPE: dict[str, str] | None DEFAULT: None

upload_url_provider

See make_wsgi_app.

TYPE: UploadUrlProvider | None DEFAULT: None

max_upload_bytes

See make_wsgi_app.

TYPE: int | None DEFAULT: None

otel_config

See make_wsgi_app.

TYPE: object | None DEFAULT: None

sentry_config

See make_wsgi_app.

TYPE: object | None DEFAULT: None

token_ttl

See make_wsgi_app.

TYPE: int DEFAULT: 3600

compression_level

See make_wsgi_app.

TYPE: int | None DEFAULT: 3

enable_not_found_page

See make_wsgi_app.

TYPE: bool DEFAULT: True

enable_landing_page

See make_wsgi_app.

TYPE: bool DEFAULT: True

enable_describe_page

See make_wsgi_app.

TYPE: bool DEFAULT: True

repo_url

See make_wsgi_app.

TYPE: str | None DEFAULT: None

oauth_resource_metadata

See make_wsgi_app.

TYPE: OAuthResourceMetadata | None DEFAULT: None

RETURNS DESCRIPTION
_SyncTestClient

A sync client that can be passed to http_connect(client=...).

Source code in vgi_rpc/http/_testing.py
def make_sync_client(
    server: RpcServer,
    *,
    prefix: str = "",
    signing_key: bytes | None = None,
    max_stream_response_bytes: int | None = None,
    max_request_bytes: int | None = None,
    authenticate: Callable[[falcon.Request], AuthContext] | None = None,
    default_headers: dict[str, str] | None = None,
    upload_url_provider: UploadUrlProvider | None = None,
    max_upload_bytes: int | None = None,
    otel_config: object | None = None,
    sentry_config: object | None = None,
    token_ttl: int = 3600,
    compression_level: int | None = 3,
    enable_not_found_page: bool = True,
    enable_landing_page: bool = True,
    enable_describe_page: bool = True,
    repo_url: str | None = None,
    oauth_resource_metadata: OAuthResourceMetadata | None = None,
) -> _SyncTestClient:
    """Create a synchronous test client for an RpcServer.

    Uses ``falcon.testing.TestClient`` internally — no real HTTP server needed.

    Args:
        server: The RpcServer to test.
        prefix: URL prefix for RPC endpoints (default ``""`` — root).
        signing_key: HMAC key for signing state tokens (see
            ``make_wsgi_app`` for details).
        max_stream_response_bytes: See ``make_wsgi_app``.
        max_request_bytes: See ``make_wsgi_app``.
        authenticate: See ``make_wsgi_app``.
        default_headers: Headers merged into every request (e.g. auth tokens).
        upload_url_provider: See ``make_wsgi_app``.
        max_upload_bytes: See ``make_wsgi_app``.
        otel_config: See ``make_wsgi_app``.
        sentry_config: See ``make_wsgi_app``.
        token_ttl: See ``make_wsgi_app``.
        compression_level: See ``make_wsgi_app``.
        enable_not_found_page: See ``make_wsgi_app``.
        enable_landing_page: See ``make_wsgi_app``.
        enable_describe_page: See ``make_wsgi_app``.
        repo_url: See ``make_wsgi_app``.
        oauth_resource_metadata: See ``make_wsgi_app``.

    Returns:
        A sync client that can be passed to ``http_connect(client=...)``.

    """
    app = make_wsgi_app(
        server,
        prefix=prefix,
        signing_key=signing_key,
        max_stream_response_bytes=max_stream_response_bytes,
        max_request_bytes=max_request_bytes,
        authenticate=authenticate,
        upload_url_provider=upload_url_provider,
        max_upload_bytes=max_upload_bytes,
        otel_config=otel_config,
        sentry_config=sentry_config,
        token_ttl=token_ttl,
        compression_level=compression_level,
        enable_not_found_page=enable_not_found_page,
        enable_landing_page=enable_landing_page,
        enable_describe_page=enable_describe_page,
        repo_url=repo_url,
        oauth_resource_metadata=oauth_resource_metadata,
    )
    return _SyncTestClient(app, default_headers=default_headers, prefix=prefix)

Header Constants

MAX_REQUEST_BYTES_HEADER module-attribute

MAX_REQUEST_BYTES_HEADER = 'VGI-Max-Request-Bytes'

MAX_UPLOAD_BYTES_HEADER module-attribute

MAX_UPLOAD_BYTES_HEADER = 'VGI-Max-Upload-Bytes'

UPLOAD_URL_HEADER module-attribute

UPLOAD_URL_HEADER = 'VGI-Upload-URL-Support'