Skip to content

HTTP Transport

HTTP transport using Falcon (server) and httpx (client). Requires pip install vgi-rpc[http].

Quick Start

Server

Create a WSGI app and serve it with any WSGI server (waitress, gunicorn, etc.):

from vgi_rpc import RpcServer, make_wsgi_app

server = RpcServer(MyService, MyServiceImpl())
app = make_wsgi_app(server)
# serve `app` with waitress, gunicorn, etc.

Client

from vgi_rpc import http_connect

with http_connect(MyService, "http://localhost:8080") as proxy:
    result = proxy.echo(message="hello")  # proxy is typed as MyService

Testing (no real server)

make_sync_client wraps a Falcon TestClient so you can test the full HTTP stack in-process:

from vgi_rpc import RpcServer
from vgi_rpc.http import http_connect, make_sync_client

server = RpcServer(MyService, MyServiceImpl())
client = make_sync_client(server)

with http_connect(MyService, client=client) as proxy:
    assert proxy.echo(message="hello") == "hello"

Landing Page

By default, GET {prefix} (e.g. GET /vgi) returns an HTML landing page showing the vgi-rpc logo, the protocol name, server ID, and links. When the server has enable_describe=True, the landing page includes a link to the describe page.

To disable the landing page:

app = make_wsgi_app(server, enable_landing_page=False)

POST {prefix} returns 405 Method Not Allowed — it does not interfere with RPC routing.

Describe Page

When the server has enable_describe=True, GET {prefix}/describe (e.g. GET /vgi/describe) returns an HTML page listing all methods, their parameters (name, type, default), return types, docstrings, and method type badges (UNARY / STREAM). The __describe__ introspection method is filtered out.

Both enable_describe=True on the RpcServer and enable_describe_page=True (the default) on make_wsgi_app() are required.

To disable only the HTML page while keeping the __describe__ RPC method available:

app = make_wsgi_app(server, enable_describe_page=False)

Reserved path

When the describe page is active, the path {prefix}/describe is reserved for the HTML page. If your service has an RPC method literally named describe, you must set enable_describe_page=False.

Not-Found Page

By default, make_wsgi_app() installs a friendly HTML 404 page for any request that does not match an RPC route. If someone navigates to the server root or a random path in a browser, they see the vgi-rpc logo, the service protocol name, and a link to vgi-rpc.query.farm instead of a generic error.

This does not affect RPC clients — a request to a valid RPC route for a non-existent method still returns a machine-readable Arrow IPC error with HTTP 404.

To disable the page:

app = make_wsgi_app(server, enable_not_found_page=False)

Sticky Sessions (opt-in)

HTTP sticky sessions let an RPC method bind a Python object — an open DuckDB cursor, a loaded model handle, a streaming LLM client — to the worker process that opened it, keyed by a signed session token that the client echoes in a VGI-Session header. Subsequent requests from the same client (inside a with_session_token() block) carry the header and the framework restores the object as ctx.session. Misroutes, expiries, and process restarts surface as a typed SessionLostError so apps can decide whether to retry or fail loudly.

The full wire contract — token format, header conventions, error kinds, the per-session serialization model, drain and crash semantics, load-balancer integration — lives in docs/sticky-sessions-spec.md. The quickstart:

from vgi_rpc import RpcServer, make_wsgi_app

server = RpcServer(MyService, MyServiceImpl())
app = make_wsgi_app(server, enable_sticky=True, sticky_default_ttl=300)

A method body opens a session by handing the framework a state object:

class MyServiceImpl:
    def open_query(self, sql: str, ctx) -> str:
        cursor = duckdb.connect().execute(sql)
        ctx.open_session(cursor)               # framework mints + returns the token
        return "ok"

    def next_rows(self, n: int, ctx) -> bytes:
        return ctx.session.fetch_arrow_table(n).serialize().to_pybytes()

    def close_query(self, ctx) -> None:
        ctx.close_session()                    # closes cursor + evicts entry

On the client side, every session-using call lives inside a with_session_token() block — that's the opt-in signal the server requires (the leaked-session guard):

from vgi_rpc.http import http_connect

with http_connect(MyService, "http://localhost:8080") as conn, conn.with_session_token() as sess:
    sess.open_query(sql="SELECT * FROM big")
    rows = sess.next_rows(n=1000)
    sess.close_query()

The block's exit fires a best-effort DELETE /vgi/__session__ so handle-bearing state gets released promptly. To stash a token across processes, call sess.detach() before the block exits — that hands the caller the token and suppresses the DELETE so the server-side session survives until its TTL or another caller closes it.

HTTP-only. Sticky machinery is not installed on pipe/subprocess/unix transports — those run as single processes where "sticky" is meaningless. ctx.open_session raises RuntimeError("sticky sessions not available on this transport") if called over a non-HTTP transport, so apps can detect-and-fall-back.

Client-driven routing via echo headers

Sticky LBs are not the only way to get a session-token-carrying request back to the worker that owns the session. With echo headers, the server tells the client (at session-open time) to attach an arbitrary set of headers on every subsequent request in the session, and the platform's edge proxy routes on those headers. Two helpers ship for Fly.io, where fly-force-instance-id is the proactive routing header fly-proxy honours:

from vgi_rpc import RpcServer
from vgi_rpc.http import make_wsgi_app
from vgi_rpc.http.fly import auto_server_id, fly_sticky_echo_headers

server = RpcServer(
    MyService, MyServiceImpl(),
    server_id=auto_server_id(),                # ⇒ FLY_MACHINE_ID on Fly, random elsewhere
)
app = make_wsgi_app(
    server,
    enable_sticky=True,
    sticky_echo_headers=fly_sticky_echo_headers(),  # ⇒ {"fly-force-instance-id": <id>} on Fly, None elsewhere
)

On Fly the server emits VGI-Echo-fly-force-instance-id: <machine-id> on session-opening responses; the client captures it and replays fly-force-instance-id: <machine-id> on every subsequent request in the session; fly-proxy routes directly to the owning Machine. No LB configuration required.

Off Fly the helpers return None so the same code is a no-op — operators don't need conditional branches.

Generic API (for non-Fly platforms): pass any dict[str, str] as sticky_echo_headers and the server will emit them as VGI-Echo-<name> on the session-opening response. The client's with_session_token() view captures + replays automatically; sess.current_echo_headers() exposes the captured map for inspection or stashing.

API Reference

Server

make_wsgi_app

make_wsgi_app(
    server: RpcServer,
    *,
    prefix: str = "",
    token_key: bytes | None = None,
    max_response_bytes: int | None = None,
    max_externalized_response_bytes: int | None = None,
    max_request_bytes: int | None = None,
    authenticate: (
        Callable[[Request], AuthContext] | None
    ) = None,
    cors_origins: str | Iterable[str] | None = None,
    cors_max_age: int | None = 7200,
    upload_url_provider: UploadUrlProvider | None = None,
    max_upload_bytes: int | None = None,
    otel_config: object | None = None,
    sentry_config: object | None = None,
    token_ttl: int = 3600,
    compression_level: int | None = 3,
    enable_not_found_page: bool = True,
    enable_landing_page: bool = True,
    enable_describe_page: bool = True,
    enable_health_endpoint: bool = True,
    repo_url: str | None = None,
    oauth_resource_metadata: (
        OAuthResourceMetadata | None
    ) = None,
    max_stream_response_bytes: int | None = None,
    enable_sticky: bool = False,
    sticky_default_ttl: float = 300.0,
    sticky_echo_headers: Mapping[str, str] | None = None
) -> App[Request, Response]

Create a Falcon WSGI app that serves RPC requests over HTTP.

PARAMETER DESCRIPTION
server

The RpcServer instance to serve.

TYPE: RpcServer

prefix

URL prefix for all RPC endpoints (default "" — root).

TYPE: str DEFAULT: ''

token_key

AEAD (XChaCha20-Poly1305) master key for sealing stream state tokens. When None (the default), a random 32-byte key is generated per process. This means state tokens issued by one worker are invalid in another — you must provide a shared key for multi-process deployments (e.g. gunicorn with multiple workers).

TYPE: bytes | None DEFAULT: None

max_response_bytes

HTTP body cap. Measured against the on-wire body size only (resp_buf.tell()); externalised payloads are governed by the separate max_externalized_response_bytes below. Applies to every HTTP method (unary, exchange, and producer streams). For producer streams it controls when the framework mints a continuation token to split a long response across multiple HTTP turns. When None (the default), no body cap is enforced — producer streams emit one batch per HTTP response for incremental streaming, and unary/exchange responses are unbounded. Phase B introduces strict-fail when a body would exceed this cap and externalisation cannot rescue it; until then the cap only governs producer continuation-token boundaries.

TYPE: int | None DEFAULT: None

max_externalized_response_bytes

Cap on the external channel — total bytes uploaded to external storage across one HTTP response (one producer turn or one unary/exchange call). Bounds how much data the client will end up fetching for one RPC, regardless of how the framework chose to deliver it. Default None is unbounded (current behaviour). Without this, a worker that emits 10 GB with externalisation enabled produces a tiny HTTP body but a 10 GB upload + 10 GB client fetch — operators with a per-call data budget need this knob to stop that.

TYPE: int | None DEFAULT: None

max_request_bytes

When set, the value is advertised via the VGI-Max-Request-Bytes response header on every response (including OPTIONS). Clients can use http_capabilities() to discover this limit and decide whether to use external storage for large payloads. Advertisement only — no server-side enforcement. None (default) omits the header.

TYPE: int | None DEFAULT: None

authenticate

Optional callback that extracts an :class:AuthContext from a Falcon Request. When provided, every request is authenticated before dispatch. The callback should raise ValueError (bad credentials) or PermissionError (forbidden) on failure — these are mapped to HTTP 401. Other exceptions propagate as 500.

TYPE: Callable[[Request], AuthContext] | None DEFAULT: None

cors_origins

Allowed origins for CORS. Pass "*" to allow all origins, a single origin string like "https://example.com", or an iterable of origin strings. None (the default) disables CORS headers. Uses Falcon's built-in CORSMiddleware which also handles preflight OPTIONS requests automatically.

TYPE: str | Iterable[str] | None DEFAULT: None

cors_max_age

Value for the Access-Control-Max-Age header on preflight OPTIONS responses, in seconds. 7200 (2 hours) by default. None omits the header. Only effective when cors_origins is set.

TYPE: int | None DEFAULT: 7200

upload_url_provider

Optional provider for generating pre-signed upload URLs. When set, the __upload_url__/init endpoint is enabled and VGI-Upload-URL-Support: true is advertised on every response.

TYPE: UploadUrlProvider | None DEFAULT: None

max_upload_bytes

When set (and upload_url_provider is set), advertised via the VGI-Max-Upload-Bytes header. Informs clients of the maximum size they may upload to vended URLs. Advertisement only — no server-side enforcement.

TYPE: int | None DEFAULT: None

otel_config

Optional OtelConfig for OpenTelemetry instrumentation. When provided, instrument_server() is called and _OtelFalconMiddleware is prepended for W3C trace propagation. Requires pip install vgi-rpc[otel].

TYPE: object | None DEFAULT: None

sentry_config

Optional SentryConfig for Sentry error reporting. When provided, instrument_server_sentry() is called. Requires pip install vgi-rpc[sentry].

TYPE: object | None DEFAULT: None

token_ttl

Maximum age of stream state tokens in seconds. Tokens older than this are rejected with HTTP 400. Default is 3600 (1 hour). Set to 0 to disable expiry checking.

TYPE: int DEFAULT: 3600

compression_level

Zstandard compression level for HTTP request/ response bodies. 3 (the default) installs _CompressionMiddleware at zstd level 3 and gzip level 6 so peers without zstd support negotiate gzip transparently. Valid zstd range is 1-22. None disables compression entirely (no codec is advertised, and bodies travel uncompressed). Set VGI_HTTP_DISABLE_ZSTD=1 in the environment to drop zstd from the advertised set even when zstandard is installed — useful for testing the gzip-fallback path.

TYPE: int | None DEFAULT: 3

enable_not_found_page

When True (the default), requests to paths that do not match any RPC route receive a friendly HTML 404 page. Set to False to use Falcon's default 404 behaviour instead.

TYPE: bool DEFAULT: True

enable_landing_page

When True (the default), GET {prefix} returns a friendly HTML landing page showing the protocol name, server ID, and links. Set to False to disable.

TYPE: bool DEFAULT: True

enable_describe_page

When True (the default) and the server has enable_describe=True, GET {prefix}/describe returns an HTML page listing all methods, parameters, and types. The path {prefix}/describe is reserved when active — an RPC method named describe would need the page disabled.

TYPE: bool DEFAULT: True

enable_health_endpoint

When True (the default), GET {prefix}/health returns a JSON health check response with the server's status, ID, and protocol name. The endpoint bypasses authentication. Set to False to disable.

TYPE: bool DEFAULT: True

repo_url

Optional URL to the service's source repository (e.g. a GitHub URL). When provided, a "Source repository" link appears on the landing page and describe page.

TYPE: str | None DEFAULT: None

oauth_resource_metadata

Optional OAuthResourceMetadata for RFC 9728 OAuth discovery. When provided, serves /.well-known/oauth-protected-resource and adds WWW-Authenticate: Bearer resource_metadata="..." to 401 responses.

TYPE: OAuthResourceMetadata | None DEFAULT: None

max_stream_response_bytes

Deprecated alias for max_response_bytes retained for backward compatibility. Emits a DeprecationWarning when set. Will be removed in a future release.

TYPE: int | None DEFAULT: None

enable_sticky

Master switch for HTTP sticky sessions. When True, CallContext.open_session becomes available on methods that opt in; the framework registers a per-worker session registry, a daemon reaper thread that evicts on TTL, the _StickyMiddleware that resolves the VGI-Session request header to a registry entry, and the DELETE {prefix}/__session__ framework-managed endpoint for client-initiated teardown. False (default) leaves the framework byte-identical to the pre-sticky wire path — no behavioral change for callers that don't use sticky.

TYPE: bool DEFAULT: False

sticky_default_ttl

Default session TTL in seconds applied by ctx.open_session when its ttl argument is None. 300.0 (5 minutes) by default. Methods can override per-session via ctx.open_session(state, ttl=60). Only meaningful when enable_sticky=True.

TYPE: float DEFAULT: 300.0

sticky_echo_headers

Optional mapping of headers the server tells the client to echo on every subsequent request inside a with_session_token() block. Emitted as VGI-Echo-<name>: <value> on session-opening responses; the client strips the prefix and replays the inner header on later requests. Used for client-driven routing — e.g. on Fly.io, pass {"fly-force-instance-id": FLY_MACHINE_ID} so subsequent requests inside the session carry fly-force-instance-id and fly-proxy routes directly to the owning Machine. Names are also advertised in the VGI-Sticky-Echo-Headers capability header so clients/LBs can introspect the contract via OPTIONS /health. See vgi_rpc/http/fly.py for a Fly- specific helper. Only meaningful when enable_sticky=True.

TYPE: Mapping[str, str] | None DEFAULT: None

RETURNS DESCRIPTION
App[Request, Response]

A Falcon application with routes for unary and stream RPC calls.

Source code in vgi_rpc/http/server/_factory.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
def make_wsgi_app(
    server: RpcServer,
    *,
    prefix: str = "",
    token_key: bytes | None = None,
    max_response_bytes: int | None = None,
    max_externalized_response_bytes: int | None = None,
    max_request_bytes: int | None = None,
    authenticate: Callable[[falcon.Request], AuthContext] | None = None,
    cors_origins: str | Iterable[str] | None = None,
    cors_max_age: int | None = 7200,
    upload_url_provider: UploadUrlProvider | None = None,
    max_upload_bytes: int | None = None,
    otel_config: object | None = None,
    sentry_config: object | None = None,
    token_ttl: int = 3600,
    compression_level: int | None = 3,
    enable_not_found_page: bool = True,
    enable_landing_page: bool = True,
    enable_describe_page: bool = True,
    enable_health_endpoint: bool = True,
    repo_url: str | None = None,
    oauth_resource_metadata: OAuthResourceMetadata | None = None,
    max_stream_response_bytes: int | None = None,
    enable_sticky: bool = False,
    sticky_default_ttl: float = 300.0,
    sticky_echo_headers: Mapping[str, str] | None = None,
) -> falcon.App[falcon.Request, falcon.Response]:
    """Create a Falcon WSGI app that serves RPC requests over HTTP.

    Args:
        server: The RpcServer instance to serve.
        prefix: URL prefix for all RPC endpoints (default ``""`` — root).
        token_key: AEAD (XChaCha20-Poly1305) master key for sealing stream
            state tokens.  When ``None`` (the default), a random 32-byte
            key is generated **per process**.  This means state tokens
            issued by one worker are invalid in another — you **must**
            provide a shared key for multi-process deployments (e.g.
            gunicorn with multiple workers).
        max_response_bytes: HTTP body cap.  Measured against the on-wire
            body size only (``resp_buf.tell()``); externalised payloads
            are governed by the separate ``max_externalized_response_bytes``
            below.  Applies to every HTTP method (unary, exchange, and
            producer streams).  For producer streams it controls when
            the framework mints a continuation token to split a long
            response across multiple HTTP turns.  When ``None`` (the
            default), no body cap is enforced — producer streams emit
            one batch per HTTP response for incremental streaming, and
            unary/exchange responses are unbounded.  Phase B introduces
            strict-fail when a body would exceed this cap and
            externalisation cannot rescue it; until then the cap only
            governs producer continuation-token boundaries.
        max_externalized_response_bytes: Cap on the *external* channel —
            total bytes uploaded to external storage across one HTTP
            response (one producer turn or one unary/exchange call).
            Bounds how much data the client will end up fetching for one
            RPC, regardless of how the framework chose to deliver it.
            Default ``None`` is unbounded (current behaviour).  Without
            this, a worker that emits 10 GB with externalisation enabled
            produces a tiny HTTP body but a 10 GB upload + 10 GB client
            fetch — operators with a per-call data budget need this knob
            to stop that.
        max_request_bytes: When set, the value is advertised via the
            ``VGI-Max-Request-Bytes`` response header on every response
            (including OPTIONS).  Clients can use ``http_capabilities()``
            to discover this limit and decide whether to use external
            storage for large payloads.  Advertisement only — no
            server-side enforcement.  ``None`` (default) omits the header.
        authenticate: Optional callback that extracts an :class:`AuthContext`
            from a Falcon ``Request``.  When provided, every request is
            authenticated before dispatch.  The callback should raise
            ``ValueError`` (bad credentials) or ``PermissionError``
            (forbidden) on failure — these are mapped to HTTP 401.
            Other exceptions propagate as 500.
        cors_origins: Allowed origins for CORS.  Pass ``"*"`` to allow all
            origins, a single origin string like ``"https://example.com"``,
            or an iterable of origin strings.  ``None`` (the default)
            disables CORS headers.  Uses Falcon's built-in
            ``CORSMiddleware`` which also handles preflight OPTIONS
            requests automatically.
        cors_max_age: Value for the ``Access-Control-Max-Age`` header on
            preflight OPTIONS responses, in seconds.  ``7200`` (2 hours)
            by default.  ``None`` omits the header.  Only effective when
            ``cors_origins`` is set.
        upload_url_provider: Optional provider for generating pre-signed
            upload URLs.  When set, the ``__upload_url__/init`` endpoint
            is enabled and ``VGI-Upload-URL-Support: true`` is advertised
            on every response.
        max_upload_bytes: When set (and ``upload_url_provider`` is set),
            advertised via the ``VGI-Max-Upload-Bytes`` header.  Informs
            clients of the maximum size they may upload to vended URLs.
            Advertisement only — no server-side enforcement.
        otel_config: Optional ``OtelConfig`` for OpenTelemetry instrumentation.
            When provided, ``instrument_server()`` is called and
            ``_OtelFalconMiddleware`` is prepended for W3C trace propagation.
            Requires ``pip install vgi-rpc[otel]``.
        sentry_config: Optional ``SentryConfig`` for Sentry error reporting.
            When provided, ``instrument_server_sentry()`` is called.
            Requires ``pip install vgi-rpc[sentry]``.
        token_ttl: Maximum age of stream state tokens in seconds.  Tokens
            older than this are rejected with HTTP 400.  Default is 3600
            (1 hour).  Set to ``0`` to disable expiry checking.
        compression_level: Zstandard compression level for HTTP request/
            response bodies.  ``3`` (the default) installs
            ``_CompressionMiddleware`` at zstd level 3 *and* gzip level 6
            so peers without zstd support negotiate gzip transparently.
            Valid zstd range is 1-22.  ``None`` disables compression
            entirely (no codec is advertised, and bodies travel
            uncompressed).  Set ``VGI_HTTP_DISABLE_ZSTD=1`` in the
            environment to drop zstd from the advertised set even when
            ``zstandard`` is installed — useful for testing the
            gzip-fallback path.
        enable_not_found_page: When ``True`` (the default), requests to
            paths that do not match any RPC route receive a friendly HTML
            404 page.  Set to ``False`` to use Falcon's default 404
            behaviour instead.
        enable_landing_page: When ``True`` (the default), ``GET {prefix}``
            returns a friendly HTML landing page showing the protocol name,
            server ID, and links.  Set to ``False`` to disable.
        enable_describe_page: When ``True`` (the default) **and** the server
            has ``enable_describe=True``, ``GET {prefix}/describe`` returns
            an HTML page listing all methods, parameters, and types.  The
            path ``{prefix}/describe`` is reserved when active — an RPC
            method named ``describe`` would need the page disabled.
        enable_health_endpoint: When ``True`` (the default),
            ``GET {prefix}/health`` returns a JSON health check response
            with the server's status, ID, and protocol name.  The endpoint
            bypasses authentication.  Set to ``False`` to disable.
        repo_url: Optional URL to the service's source repository (e.g. a
            GitHub URL).  When provided, a "Source repository" link appears
            on the landing page and describe page.
        oauth_resource_metadata: Optional ``OAuthResourceMetadata`` for
            RFC 9728 OAuth discovery.  When provided, serves
            ``/.well-known/oauth-protected-resource`` and adds
            ``WWW-Authenticate: Bearer resource_metadata="..."`` to 401
            responses.
        max_stream_response_bytes: **Deprecated** alias for
            ``max_response_bytes`` retained for backward compatibility.
            Emits a ``DeprecationWarning`` when set.  Will be removed
            in a future release.
        enable_sticky: Master switch for HTTP sticky sessions.  When
            ``True``, ``CallContext.open_session`` becomes available on
            methods that opt in; the framework registers a per-worker
            session registry, a daemon reaper thread that evicts on TTL,
            the ``_StickyMiddleware`` that resolves the ``VGI-Session``
            request header to a registry entry, and the
            ``DELETE {prefix}/__session__`` framework-managed endpoint
            for client-initiated teardown.  ``False`` (default) leaves
            the framework byte-identical to the pre-sticky wire path —
            no behavioral change for callers that don't use sticky.
        sticky_default_ttl: Default session TTL in seconds applied by
            ``ctx.open_session`` when its ``ttl`` argument is ``None``.
            ``300.0`` (5 minutes) by default.  Methods can override
            per-session via ``ctx.open_session(state, ttl=60)``.  Only
            meaningful when ``enable_sticky=True``.
        sticky_echo_headers: Optional mapping of headers the server tells
            the client to echo on every subsequent request inside a
            ``with_session_token()`` block.  Emitted as ``VGI-Echo-<name>:
            <value>`` on session-opening responses; the client strips the
            prefix and replays the inner header on later requests.  Used
            for client-driven routing — e.g. on Fly.io, pass
            ``{"fly-force-instance-id": FLY_MACHINE_ID}`` so subsequent
            requests inside the session carry ``fly-force-instance-id``
            and fly-proxy routes directly to the owning Machine.  Names
            are also advertised in the ``VGI-Sticky-Echo-Headers`` capability
            header so clients/LBs can introspect the contract via
            ``OPTIONS /health``.  See ``vgi_rpc/http/fly.py`` for a Fly-
            specific helper.  Only meaningful when ``enable_sticky=True``.

    Returns:
        A Falcon application with routes for unary and stream RPC calls.

    """
    # Deprecated alias: ``max_stream_response_bytes`` was renamed to
    # ``max_response_bytes`` once the cap stopped being stream-only.
    if max_stream_response_bytes is not None:
        if max_response_bytes is not None:
            raise TypeError("Pass either max_response_bytes or max_stream_response_bytes, not both")
        warnings.warn(
            "max_stream_response_bytes is deprecated; use max_response_bytes instead. "
            "The cap now applies to all HTTP method responses, not just streams.",
            DeprecationWarning,
            stacklevel=2,
        )
        max_response_bytes = max_stream_response_bytes

    if token_key is None:
        warnings.warn(
            "No token_key provided; generating a random per-process AEAD key. "
            "State tokens will be invalid across workers — pass a shared key "
            "for multi-process deployments.",
            stacklevel=2,
        )
        token_key = os.urandom(32)
    # OpenTelemetry instrumentation (optional)
    if otel_config is not None:
        from vgi_rpc.otel import OtelConfig, _OtelFalconMiddleware, instrument_server

        if not isinstance(otel_config, OtelConfig):
            raise TypeError(f"otel_config must be an OtelConfig instance, got {type(otel_config).__name__}")
        instrument_server(server, otel_config)

    # Sentry error reporting (optional)
    if sentry_config is not None:
        from vgi_rpc.sentry import SentryConfig, instrument_server_sentry

        if not isinstance(sentry_config, SentryConfig):
            raise TypeError(f"sentry_config must be a SentryConfig instance, got {type(sentry_config).__name__}")
        instrument_server_sentry(server, sentry_config)

    app_handler = _HttpRpcApp(
        server,
        token_key,
        max_response_bytes,
        max_request_bytes,
        upload_url_provider,
        max_upload_bytes,
        token_ttl,
        max_externalized_response_bytes=max_externalized_response_bytes,
    )
    middleware: list[Any] = [
        _TransportNotifyMiddleware(server),
        _DrainRequestMiddleware(),
        _RequestIdMiddleware(),
        _AccessLogContextMiddleware(),
    ]
    if enable_sticky:
        # Pin the server_id into req.env so _StickyMiddleware can validate
        # that incoming session tokens were minted by THIS worker. Must run
        # before the sticky middleware itself.
        middleware.append(_ServerIdEnvMiddleware(server.server_id))

    # Enforce the advertised max_request_bytes cap server-side.  The
    # __upload_url__/init route (and capability-discovery routes) are
    # exempt because their payloads are intrinsically tiny.
    if max_request_bytes is not None:
        middleware.append(
            _MaxRequestBytesMiddleware(
                max_request_bytes,
                exempt_prefixes=(
                    f"{prefix}/__upload_url__",
                    f"{prefix}/health",
                ),
            )
        )

    # Compression middleware decompresses request bodies and compresses
    # responses — must come before auth so handlers read plaintext bodies.
    # Decompression cap is 16x the wire cap: generous enough for normal
    # compression ratios on Arrow IPC bodies, tight enough that a tiny
    # compressed body cannot claim hundreds of MB and OOM the server.
    #
    # ``compression_level`` (the historical int knob) sets the zstd level;
    # gzip is always offered at level 6 alongside it so peers that can't
    # do zstd (browsers without a polyfill, Python aiohttp without
    # ``zstandard``) can still negotiate compression.  ``VGI_HTTP_DISABLE_ZSTD=1``
    # drops zstd from the advertised set even when ``zstandard`` is
    # importable — used by tests that need to exercise the gzip path
    # without uninstalling the package.
    enabled_encodings: tuple[Encoding, ...] = ()
    if compression_level is not None:
        max_decompressed_bytes = max_request_bytes * 16 if max_request_bytes is not None else None
        codec_levels: dict[Encoding, int] = {
            Encoding.ZSTD: compression_level,
            Encoding.GZIP: 6,
        }
        if os.environ.get("VGI_HTTP_DISABLE_ZSTD") == "1":
            codec_levels.pop(Encoding.ZSTD, None)
        # available_encodings() drops codecs whose runtime support is missing
        # (e.g. zstd when zstandard isn't installed) — keep the factory and
        # middleware in lockstep so the advertised list matches what we
        # actually accept.
        runtime = set(available_encodings())
        codec_levels = {enc: lvl for enc, lvl in codec_levels.items() if enc in runtime}
        enabled_encodings = tuple(codec_levels)
        if codec_levels:
            middleware.append(
                _CompressionMiddleware(
                    codec_levels,
                    max_decompressed_bytes=max_decompressed_bytes,
                )
            )

    # OTel middleware must come before auth so spans cover the full request
    if otel_config is not None:
        middleware.append(_OtelFalconMiddleware())

    # Always expose auth and request-id headers; capability headers are
    # appended conditionally below.
    cors_expose: list[str] = ["WWW-Authenticate", _REQUEST_ID_HEADER, "X-VGI-Content-Encoding", RPC_ERROR_HEADER]

    # Build capability headers
    capability_headers: dict[str, str] = {}
    if max_request_bytes is not None:
        capability_headers[MAX_REQUEST_BYTES_HEADER] = str(max_request_bytes)
        cors_expose.append(MAX_REQUEST_BYTES_HEADER)
    if max_response_bytes is not None:
        capability_headers[MAX_RESPONSE_BYTES_HEADER] = str(max_response_bytes)
        cors_expose.append(MAX_RESPONSE_BYTES_HEADER)
    if max_externalized_response_bytes is not None:
        capability_headers[MAX_EXTERNALIZED_RESPONSE_BYTES_HEADER] = str(max_externalized_response_bytes)
        cors_expose.append(MAX_EXTERNALIZED_RESPONSE_BYTES_HEADER)
    # Externalisation-enabled status reflects whether the server has a
    # storage backend wired up.  Conformance tests use this to decide
    # whether to expect externalised payloads (and to skip the strict-fail
    # tests that need it on/off).
    capability_headers[EXTERNALIZATION_ENABLED_HEADER] = (
        "true" if server.external_config is not None and server.external_config.storage is not None else "false"
    )
    cors_expose.append(EXTERNALIZATION_ENABLED_HEADER)
    if upload_url_provider is not None:
        capability_headers[UPLOAD_URL_HEADER] = "true"
        cors_expose.append(UPLOAD_URL_HEADER)
        if max_upload_bytes is not None:
            capability_headers[MAX_UPLOAD_BYTES_HEADER] = str(max_upload_bytes)
            cors_expose.append(MAX_UPLOAD_BYTES_HEADER)
    # Advertise the compression codecs we actually accept on the wire.  A
    # client compares this against its own codec set and picks the first
    # mutually supported one for its request body.  Absent header ⇒
    # ``{zstd}`` (back-compat with pre-gzip servers).
    if enabled_encodings:
        capability_headers[SUPPORTED_ENCODINGS_HEADER] = ", ".join(e.value for e in enabled_encodings)
        cors_expose.append(SUPPORTED_ENCODINGS_HEADER)
    # Sticky session capability headers — advertised only when sticky is on
    # so OPTIONS /health responses cleanly distinguish sticky-capable
    # servers from non-sticky ones. The session response headers are
    # exposed via CORS so browser clients inside with_session_token()
    # can read VGI-Session / VGI-Session-Close from cross-origin responses.
    if enable_sticky:
        capability_headers[STICKY_ENABLED_HEADER] = "true"
        capability_headers[STICKY_DEFAULT_TTL_HEADER] = str(int(sticky_default_ttl))
        cors_expose.append(STICKY_ENABLED_HEADER)
        cors_expose.append(STICKY_DEFAULT_TTL_HEADER)
        cors_expose.append(SESSION_HEADER)
        cors_expose.append(SESSION_CLOSE_HEADER)
        # Echo headers (PR2): advertise the names a client must replay on
        # subsequent session requests so clients/LBs can discover the
        # contract via OPTIONS /health. Each VGI-Echo-<name> response
        # header is also CORS-exposed so browser clients can read it.
        if sticky_echo_headers:
            capability_headers[STICKY_ECHO_HEADERS_HEADER] = ", ".join(sticky_echo_headers.keys())
            cors_expose.append(STICKY_ECHO_HEADERS_HEADER)
            cors_expose.extend(f"{ECHO_HEADER_PREFIX}{name}" for name in sticky_echo_headers)

    # OAuth resource metadata (RFC 9728)
    from vgi_rpc.http._oauth import OAuthResourceMetadata as _OAuthMeta
    from vgi_rpc.http._oauth import _build_www_authenticate

    www_authenticate: str | None = None
    _validated_oauth_metadata: _OAuthMeta | None = None
    if oauth_resource_metadata is not None:
        if not isinstance(oauth_resource_metadata, _OAuthMeta):
            raise TypeError(
                f"oauth_resource_metadata must be an OAuthResourceMetadata instance, "
                f"got {type(oauth_resource_metadata).__name__}"
            )
        _validated_oauth_metadata = oauth_resource_metadata
        www_authenticate = _build_www_authenticate(_validated_oauth_metadata, prefix)

    if cors_origins is not None:
        cors_kwargs: dict[str, Any] = {
            "allow_origins": cors_origins,
            "expose_headers": cors_expose,
        }
        middleware.append(falcon.CORSMiddleware(**cors_kwargs))
        if cors_max_age is not None:
            middleware.append(_CorsMaxAgeMiddleware(cors_max_age))
    # OAuth PKCE browser flow — only when authenticate + OAuth metadata + client_id
    _pkce_active = False
    _pkce_user_info_html: str | None = None
    _exempt_prefixes_list: list[str] = []
    if enable_health_endpoint:
        _exempt_prefixes_list.append(f"{prefix}/health")
    if (
        authenticate is not None
        and _validated_oauth_metadata is not None
        and _validated_oauth_metadata.client_id is not None
    ):
        from urllib.parse import urlparse as _urlparse

        from vgi_rpc.http._bearer import chain_authenticate
        from vgi_rpc.http._oauth_pkce import (
            _DEFAULT_ALLOWED_RETURN_ORIGINS,
            _create_oidc_discovery,
            _derive_session_key,
            _OAuthCallbackResource,
            _OAuthLogoutResource,
            _OAuthPkceMiddleware,
            _OAuthTokenProxyResource,
            build_user_info_html,
            make_cookie_authenticate,
        )

        _pkce_issuer = _validated_oauth_metadata.authorization_servers[0]
        _pkce_oidc_discovery = _create_oidc_discovery(_pkce_issuer)
        _pkce_session_key = _derive_session_key(token_key)
        _pkce_resource_parsed = _urlparse(_validated_oauth_metadata.resource)
        _pkce_secure = _pkce_resource_parsed.scheme == "https"
        _pkce_redirect_uri = f"{_pkce_resource_parsed.scheme}://{_pkce_resource_parsed.netloc}{prefix}/_oauth/callback"

        if not _pkce_secure and _pkce_resource_parsed.hostname not in ("localhost", "127.0.0.1", "::1"):
            _logger.warning(
                "OAuth PKCE is configured without HTTPS (%s) — cookies will not be Secure. "
                "This is acceptable for local development but not for production.",
                _validated_oauth_metadata.resource,
            )

        # Wrap authenticate to also accept tokens from a cookie
        _pkce_cookie_auth = make_cookie_authenticate(authenticate)
        authenticate = chain_authenticate(authenticate, _pkce_cookie_auth)

        _pkce_client_id: str = _validated_oauth_metadata.client_id
        _pkce_client_secret = _validated_oauth_metadata.client_secret
        _pkce_use_id_token = _validated_oauth_metadata.use_id_token_as_bearer
        _pkce_scope = (
            " ".join(_validated_oauth_metadata.scopes_supported)
            if _validated_oauth_metadata.scopes_supported
            else "openid email"
        )
        _exempt_prefixes_list.append(f"{prefix}/_oauth/")
        _pkce_active = True
        _pkce_user_info_html = build_user_info_html(prefix)

    on_auth_failure: Callable[[str | None, str], None] | None = None
    if authenticate is not None and otel_config is not None:
        from vgi_rpc.otel import OtelConfig as _OtelCfg
        from vgi_rpc.otel import make_auth_failure_counter

        assert isinstance(otel_config, _OtelCfg)  # validated above
        on_auth_failure = make_auth_failure_counter(otel_config, server.protocol_name)
    middleware.append(
        _AuthMiddleware(
            authenticate,
            www_authenticate=www_authenticate,
            on_auth_failure=on_auth_failure,
            exempt_prefixes=tuple(_exempt_prefixes_list),
        )
    )
    # Sticky middleware runs AFTER auth so AAD binding sees the authenticated
    # principal. The health endpoint is exempt because it must remain
    # cheap and auth-free even when sticky is on.
    sticky_registry: _SessionRegistry | None = None
    if enable_sticky:
        sticky_registry = _SessionRegistry(default_ttl=sticky_default_ttl)
        # The DELETE /__session__ resource does its own idempotent token
        # validation (returns 200 on any failure to avoid info leak), so
        # the middleware-level "session not found ⇒ EXCEPTION batch"
        # behaviour must NOT run for that endpoint — otherwise stale
        # DELETEs would surface as RpcError instead of clean 200s.
        middleware.append(
            _StickyMiddleware(
                sticky_registry,
                token_key,
                exempt_prefixes=(
                    f"{prefix}/health",
                    f"{prefix}/{_SESSION_ENDPOINT}",
                ),
                echo_headers=sticky_echo_headers,
            )
        )
    if authenticate is not None and _pkce_active:
        middleware.append(
            _OAuthPkceMiddleware(
                session_key=_pkce_session_key,
                oidc_discovery=_pkce_oidc_discovery,
                client_id=_pkce_client_id,
                prefix=prefix,
                secure_cookie=_pkce_secure,
                redirect_uri=_pkce_redirect_uri,
                scope=_pkce_scope,
            )
        )
    if capability_headers:
        middleware.append(_CapabilitiesMiddleware(capability_headers))
    app: falcon.App[falcon.Request, falcon.Response] = falcon.App(middleware=middleware or None)
    app.set_error_serializer(_error_serializer)

    # OAuth well-known endpoint (must be before RPC routes)
    if _validated_oauth_metadata is not None:
        from vgi_rpc.http._oauth import _OAuthResourceMetadataResource

        # When PKCE is active and a server-side client_secret is configured,
        # advertise the proxy token endpoint so SPA clients can perform PKCE
        # token exchanges without holding the secret themselves.
        _advertised_token_endpoint: str | None = None
        if _pkce_active and _validated_oauth_metadata.client_secret is not None:
            _advertised_token_endpoint = (
                f"{_pkce_resource_parsed.scheme}://{_pkce_resource_parsed.netloc}{prefix}/_oauth/token"
            )
        well_known = _OAuthResourceMetadataResource(_validated_oauth_metadata, _advertised_token_endpoint)
        app.add_route("/.well-known/oauth-protected-resource", well_known)
        if prefix and prefix != "/":
            app.add_route(f"/.well-known/oauth-protected-resource{prefix}", well_known)

    app.add_route(f"{prefix}/{{method}}", _RpcResource(app_handler))
    app.add_route(f"{prefix}/{{method}}/init", _StreamInitResource(app_handler))
    app.add_route(f"{prefix}/{{method}}/exchange", _ExchangeResource(app_handler))
    if upload_url_provider is not None:
        app.add_route(f"{prefix}/__upload_url__/init", _UploadUrlResource(app_handler))
    if enable_sticky:
        assert sticky_registry is not None  # guaranteed by the enable_sticky branch above
        app.add_route(
            f"{prefix}/{_SESSION_ENDPOINT}",
            _SessionResource(sticky_registry, token_key),
        )

    # OAuth PKCE callback and logout routes (must be before not-found sink)
    if _pkce_active:
        app.add_route(
            f"{prefix}/_oauth/callback",
            _OAuthCallbackResource(
                session_key=_pkce_session_key,
                oidc_discovery=_pkce_oidc_discovery,
                client_id=_pkce_client_id,
                client_secret=_pkce_client_secret,
                use_id_token=_pkce_use_id_token,
                prefix=prefix,
                secure_cookie=_pkce_secure,
                redirect_uri=_pkce_redirect_uri,
            ),
        )
        app.add_route(f"{prefix}/_oauth/logout", _OAuthLogoutResource(prefix, _pkce_secure))
        # Token-exchange proxy: lets SPA PKCE clients (which cannot safely
        # hold a client_secret) complete authorization_code/refresh_token
        # exchanges against IdPs that require client_secret (e.g. Google).
        app.add_route(
            f"{prefix}/_oauth/token",
            _OAuthTokenProxyResource(
                client_id=_pkce_client_id,
                client_secret=_pkce_client_secret,
                oidc_discovery=_pkce_oidc_discovery,
                allowed_origins=_DEFAULT_ALLOWED_RETURN_ORIGINS,
            ),
        )

    # Describe page — GET {prefix}/describe (requires both flags and server support)
    describe_page_active = enable_describe_page and server.describe_enabled
    if describe_page_active:
        describe_html = _build_describe_html(server, prefix, repo_url)
        if _pkce_user_info_html:
            describe_html = describe_html.replace(b"</body>", _pkce_user_info_html.encode() + b"\n</body>")
        app.add_route(f"{prefix}/describe", _DescribePageResource(describe_html))

    # Health endpoint — GET {prefix}/health
    if enable_health_endpoint:
        app.add_route(f"{prefix}/health", _HealthResource(server.server_id, server.protocol_name))

    # Landing page — GET {prefix}
    if enable_landing_page:
        describe_path = f"{prefix}/describe" if describe_page_active else None
        landing_body = _build_landing_html(prefix, server.protocol_name, server.server_id, describe_path, repo_url)
        if _pkce_user_info_html:
            landing_body = landing_body.replace(b"</body>", _pkce_user_info_html.encode() + b"\n</body>")
        app.add_route(prefix or "/", _LandingPageResource(landing_body))

    if enable_not_found_page:
        app.add_sink(_make_not_found_sink(prefix, server.protocol_name))

    _logger.info(
        "WSGI app created for %s (server_id=%s, prefix=%s, auth=%s)",
        server.protocol_name,
        server.server_id,
        prefix,
        "enabled" if authenticate is not None else "disabled",
        extra={
            "server_id": server.server_id,
            "protocol": server.protocol_name,
            "prefix": prefix,
            "auth_enabled": authenticate is not None,
        },
    )

    return app

serve_http

serve_http(
    server: RpcServer,
    *,
    host: str = "127.0.0.1",
    port: int = 0,
    max_response_bytes: int | None = None,
    max_externalized_response_bytes: int | None = None,
    max_stream_response_bytes: int | None = None,
    enable_sticky: bool = False,
    sticky_default_ttl: float = 300.0,
    sticky_echo_headers: Mapping[str, str] | None = None,
    drain_grace_seconds: float = 30.0,
    install_signal_handlers: bool = True
) -> None

Serve an RpcServer over HTTP using waitress.

This is a convenience wrapper that combines :func:make_wsgi_app with automatic port selection and waitress.serve.

The selected port is printed to stdout as PORT:<port> for machine-readable discovery (e.g. by test harnesses or process managers).

When enable_sticky=True (and install_signal_handlers=True, the default), this wrapper installs SIGTERM / SIGINT handlers that perform a graceful drain:

  1. First signal: flip the registry's drain flag so subsequent ctx.open_session calls raise :class:~vgi_rpc.rpc.ServerDrainingError. Existing sessions continue to serve.
  2. After drain_grace_seconds (in a daemon timer thread): invoke state.close() on every live session and os._exit(0).
  3. Second signal: skip the grace period and exit immediately.

For pre-fork servers (gunicorn, uwsgi) operators wire their own worker_exit hooks. See :func:vgi_rpc.http.drain_handle and the spec at docs/sticky-sessions-spec.md for the operator recipe.

PARAMETER DESCRIPTION
server

The RpcServer to expose.

TYPE: RpcServer

host

Bind address (default 127.0.0.1).

TYPE: str DEFAULT: '127.0.0.1'

port

TCP port. 0 (the default) auto-selects a free port.

TYPE: int DEFAULT: 0

max_response_bytes

HTTP body cap; applies to every method. See :func:make_wsgi_app for full semantics.

TYPE: int | None DEFAULT: None

max_externalized_response_bytes

Cap on bytes uploaded to external storage per HTTP response. See :func:make_wsgi_app.

TYPE: int | None DEFAULT: None

max_stream_response_bytes

Deprecated alias for max_response_bytes.

TYPE: int | None DEFAULT: None

enable_sticky

See :func:make_wsgi_app.

TYPE: bool DEFAULT: False

sticky_default_ttl

See :func:make_wsgi_app.

TYPE: float DEFAULT: 300.0

sticky_echo_headers

See :func:make_wsgi_app.

TYPE: Mapping[str, str] | None DEFAULT: None

drain_grace_seconds

Seconds to wait between flipping the drain flag and forcibly exiting on SIGTERM. Existing sessions get this long to complete in-flight work. Default 30.0. Ignored when sticky is disabled.

TYPE: float DEFAULT: 30.0

install_signal_handlers

When True (the default), install the SIGTERM / SIGINT handlers described above. Set to False when embedding serve_http inside a larger process that already owns signal handling (rare; the default is correct for the standard "one process, serve until killed" deployment).

TYPE: bool DEFAULT: True

Source code in vgi_rpc/http/server/_serve.py
def serve_http(
    server: RpcServer,
    *,
    host: str = "127.0.0.1",
    port: int = 0,
    max_response_bytes: int | None = None,
    max_externalized_response_bytes: int | None = None,
    max_stream_response_bytes: int | None = None,
    enable_sticky: bool = False,
    sticky_default_ttl: float = 300.0,
    sticky_echo_headers: Mapping[str, str] | None = None,
    drain_grace_seconds: float = 30.0,
    install_signal_handlers: bool = True,
) -> None:
    """Serve an ``RpcServer`` over HTTP using waitress.

    This is a convenience wrapper that combines :func:`make_wsgi_app` with
    automatic port selection and ``waitress.serve``.

    The selected port is printed to stdout as ``PORT:<port>`` for
    machine-readable discovery (e.g. by test harnesses or process managers).

    When ``enable_sticky=True`` (and ``install_signal_handlers=True``, the
    default), this wrapper installs SIGTERM / SIGINT handlers that perform
    a graceful drain:

    1. First signal: flip the registry's drain flag so subsequent
       ``ctx.open_session`` calls raise :class:`~vgi_rpc.rpc.ServerDrainingError`.
       Existing sessions continue to serve.
    2. After ``drain_grace_seconds`` (in a daemon timer thread): invoke
       ``state.close()`` on every live session and ``os._exit(0)``.
    3. Second signal: skip the grace period and exit immediately.

    For pre-fork servers (gunicorn, uwsgi) operators wire their own
    ``worker_exit`` hooks. See :func:`vgi_rpc.http.drain_handle` and the
    spec at ``docs/sticky-sessions-spec.md`` for the operator recipe.

    Args:
        server: The ``RpcServer`` to expose.
        host: Bind address (default ``127.0.0.1``).
        port: TCP port.  ``0`` (the default) auto-selects a free port.
        max_response_bytes: HTTP body cap; applies to every method.  See
            :func:`make_wsgi_app` for full semantics.
        max_externalized_response_bytes: Cap on bytes uploaded to external
            storage per HTTP response.  See :func:`make_wsgi_app`.
        max_stream_response_bytes: **Deprecated** alias for
            ``max_response_bytes``.
        enable_sticky: See :func:`make_wsgi_app`.
        sticky_default_ttl: See :func:`make_wsgi_app`.
        sticky_echo_headers: See :func:`make_wsgi_app`.
        drain_grace_seconds: Seconds to wait between flipping the drain
            flag and forcibly exiting on SIGTERM.  Existing sessions get
            this long to complete in-flight work.  Default ``30.0``.
            Ignored when sticky is disabled.
        install_signal_handlers: When ``True`` (the default), install the
            SIGTERM / SIGINT handlers described above.  Set to ``False``
            when embedding ``serve_http`` inside a larger process that
            already owns signal handling (rare; the default is correct
            for the standard "one process, serve until killed" deployment).

    """
    if max_stream_response_bytes is not None:
        if max_response_bytes is not None:
            raise TypeError("Pass either max_response_bytes or max_stream_response_bytes, not both")
        warnings.warn(
            "max_stream_response_bytes is deprecated; use max_response_bytes instead.",
            DeprecationWarning,
            stacklevel=2,
        )
        max_response_bytes = max_stream_response_bytes

    try:
        import waitress as _waitress
    except ImportError:
        print("HTTP transport requires waitress: pip install vgi-rpc[http]", file=sys.stderr)
        sys.exit(1)

    if port == 0:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.bind((host, 0))
            port = int(s.getsockname()[1])

    app = make_wsgi_app(
        server,
        max_response_bytes=max_response_bytes,
        max_externalized_response_bytes=max_externalized_response_bytes,
        enable_sticky=enable_sticky,
        sticky_default_ttl=sticky_default_ttl,
        sticky_echo_headers=sticky_echo_headers,
    )

    if install_signal_handlers and enable_sticky:
        _install_drain_signal_handlers(app, drain_grace_seconds)

    print(f"PORT:{port}", flush=True)
    print(f"Serving on http://{host}:{port}/", file=sys.stderr, flush=True)
    _waitress.serve(app, host=host, port=port, _quiet=True)

Client

http_connect

http_connect(
    protocol: type[P],
    base_url: str | None = None,
    *,
    prefix: str | None = None,
    on_log: Callable[[Message], None] | None = None,
    client: Client | _SyncTestClient | None = None,
    external_location: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = FULL,
    retry: HttpRetryConfig | None = None,
    compression_level: int | None = 3
) -> Iterator[P]

Connect to an HTTP RPC server and yield a typed proxy.

PARAMETER DESCRIPTION
protocol

The Protocol class defining the RPC interface.

TYPE: type[P]

base_url

Base URL of the server (e.g. http://localhost:8000). Required when client is None; ignored when a pre-built client is provided. The internally-created client follows redirects transparently.

TYPE: str | None DEFAULT: None

prefix

URL prefix matching the server's prefix. When None (the default), auto-detected from a _SyncTestClient's .prefix attribute, or "" for other clients.

TYPE: str | None DEFAULT: None

on_log

Optional callback for log messages from the server.

TYPE: Callable[[Message], None] | None DEFAULT: None

client

Optional HTTP client — httpx.Client for production, or a _SyncTestClient from make_sync_client() for testing.

TYPE: Client | _SyncTestClient | None DEFAULT: None

external_location

Optional ExternalLocationConfig for resolving and producing externalized batches.

TYPE: ExternalLocationConfig | None DEFAULT: None

ipc_validation

Validation level for incoming IPC batches.

TYPE: IpcValidation DEFAULT: FULL

retry

Optional retry configuration for transient HTTP failures. When None (the default), no retries are attempted.

TYPE: HttpRetryConfig | None DEFAULT: None

compression_level

Zstandard compression level for request bodies. 3 (the default) compresses requests and adds Content-Encoding: zstd. None disables request compression (httpx still auto-decompresses server responses).

TYPE: int | None DEFAULT: 3

YIELDS DESCRIPTION
P

A typed RPC proxy supporting all methods defined on protocol.

RAISES DESCRIPTION
ValueError

If base_url is None and client is None.

Source code in vgi_rpc/http/_client.py
@contextlib.contextmanager
def http_connect[P](
    protocol: type[P],
    base_url: str | None = None,
    *,
    prefix: str | None = None,
    on_log: Callable[[Message], None] | None = None,
    client: httpx.Client | _SyncTestClient | None = None,
    external_location: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = IpcValidation.FULL,
    retry: HttpRetryConfig | None = None,
    compression_level: int | None = 3,
) -> Iterator[P]:
    """Connect to an HTTP RPC server and yield a typed proxy.

    Args:
        protocol: The Protocol class defining the RPC interface.
        base_url: Base URL of the server (e.g. ``http://localhost:8000``).
            Required when *client* is ``None``; ignored when a pre-built
            *client* is provided.  The internally-created client follows
            redirects transparently.
        prefix: URL prefix matching the server's prefix.  When ``None``
            (the default), auto-detected from a ``_SyncTestClient``'s
            ``.prefix`` attribute, or ``""`` for other clients.
        on_log: Optional callback for log messages from the server.
        client: Optional HTTP client — ``httpx.Client`` for production,
            or a ``_SyncTestClient`` from ``make_sync_client()`` for testing.
        external_location: Optional ExternalLocationConfig for
            resolving and producing externalized batches.
        ipc_validation: Validation level for incoming IPC batches.
        retry: Optional retry configuration for transient HTTP failures.
            When ``None`` (the default), no retries are attempted.
        compression_level: Zstandard compression level for request bodies.
            ``3`` (the default) compresses requests and adds
            ``Content-Encoding: zstd``.  ``None`` disables request
            compression (httpx still auto-decompresses server responses).

    Yields:
        A typed RPC proxy supporting all methods defined on *protocol*.

    Raises:
        ValueError: If *base_url* is ``None`` and *client* is ``None``.

    """
    own_client = client is None
    if client is None:
        if base_url is None:
            raise ValueError("base_url is required when client is not provided")
        client = httpx.Client(base_url=base_url, follow_redirects=True)

    # Auto-detect prefix from _SyncTestClient when not explicitly provided
    url_prefix = getattr(client, "prefix", "") if prefix is None else prefix
    try:
        yield cast(
            P,
            _HttpProxy(
                protocol,
                client,
                url_prefix,
                on_log,
                external_config=external_location,
                ipc_validation=ipc_validation,
                retry_config=retry,
                compression_level=compression_level,
            ),
        )
    finally:
        if own_client:
            client.close()

http_introspect

http_introspect(
    base_url: str | None = None,
    *,
    prefix: str | None = None,
    client: Client | _SyncTestClient | None = None,
    ipc_validation: IpcValidation = FULL,
    retry: HttpRetryConfig | None = None
) -> ServiceDescription

Send a __describe__ request over HTTP and return a ServiceDescription.

PARAMETER DESCRIPTION
base_url

Base URL of the server (e.g. http://localhost:8000). Required when client is None.

TYPE: str | None DEFAULT: None

prefix

URL prefix matching the server's prefix. None auto-detects from _SyncTestClient.

TYPE: str | None DEFAULT: None

client

Optional HTTP client (httpx.Client or _SyncTestClient).

TYPE: Client | _SyncTestClient | None DEFAULT: None

ipc_validation

Validation level for incoming IPC batches.

TYPE: IpcValidation DEFAULT: FULL

retry

Optional retry configuration for transient HTTP failures.

TYPE: HttpRetryConfig | None DEFAULT: None

RETURNS DESCRIPTION
ServiceDescription

A ServiceDescription with all method metadata.

RAISES DESCRIPTION
RpcError

If the server does not support introspection or returns an error.

ValueError

If base_url is None and client is None.

Source code in vgi_rpc/http/_client.py
def http_introspect(
    base_url: str | None = None,
    *,
    prefix: str | None = None,
    client: httpx.Client | _SyncTestClient | None = None,
    ipc_validation: IpcValidation = IpcValidation.FULL,
    retry: HttpRetryConfig | None = None,
) -> ServiceDescription:
    """Send a ``__describe__`` request over HTTP and return a ``ServiceDescription``.

    Args:
        base_url: Base URL of the server (e.g. ``http://localhost:8000``).
            Required when *client* is ``None``.
        prefix: URL prefix matching the server's prefix.  ``None``
            auto-detects from ``_SyncTestClient``.
        client: Optional HTTP client (``httpx.Client`` or ``_SyncTestClient``).
        ipc_validation: Validation level for incoming IPC batches.
        retry: Optional retry configuration for transient HTTP failures.

    Returns:
        A ``ServiceDescription`` with all method metadata.

    Raises:
        RpcError: If the server does not support introspection or returns
            an error.
        ValueError: If *base_url* is ``None`` and *client* is ``None``.

    """
    from vgi_rpc.introspect import DESCRIBE_METHOD_NAME, parse_describe_batch

    own_client = client is None
    if client is None:
        if base_url is None:
            raise ValueError("base_url is required when client is not provided")
        client = httpx.Client(base_url=base_url, follow_redirects=True)
    if prefix is None:
        prefix = getattr(client, "prefix", "")

    try:
        # Build a minimal request: empty params with __describe__ method name
        req_buf = BytesIO()
        request_metadata = pa.KeyValueMetadata(
            {
                b"vgi_rpc.method": DESCRIBE_METHOD_NAME.encode(),
                b"vgi_rpc.request_version": b"1",
            }
        )
        with ipc.new_stream(req_buf, _EMPTY_SCHEMA) as writer:
            writer.write_batch(
                pa.RecordBatch.from_pydict({}, schema=_EMPTY_SCHEMA),
                custom_metadata=request_metadata,
            )

        resp = _post_with_retry(
            client,
            f"{prefix}/{DESCRIBE_METHOD_NAME}",
            content=req_buf.getvalue(),
            headers={"Content-Type": _ARROW_CONTENT_TYPE},
            config=retry,
        )

        reader = _open_response_stream(resp.content, resp.status_code, ipc_validation)
        # Skip log batches
        while True:
            batch, custom_metadata = reader.read_next_batch_with_custom_metadata()
            if not _dispatch_log_or_error(batch, custom_metadata):
                break
        _drain_stream(reader)

        return parse_describe_batch(batch, custom_metadata)
    finally:
        if own_client:
            client.close()

http_capabilities

http_capabilities(
    base_url: str | None = None,
    *,
    prefix: str | None = None,
    client: Client | _SyncTestClient | None = None,
    retry: HttpRetryConfig | None = None
) -> HttpServerCapabilities

Discover server capabilities via OPTIONS {prefix}/health.

The capability headers (VGI-Max-Request-Bytes, VGI-Upload-URL-Support, VGI-Max-Upload-Bytes) are emitted on every response, but the dedicated discovery target is /health because it is mandatory in every implementation and exempt from auth. The server may include Cache-Control: max-age=N on the OPTIONS response; if so the returned HttpServerCapabilities carries cache_expires_at so callers can refresh on expiry.

PARAMETER DESCRIPTION
base_url

Base URL of the server (e.g. http://localhost:8000). Required when client is None.

TYPE: str | None DEFAULT: None

prefix

URL prefix matching the server's prefix. None auto-detects from _SyncTestClient.

TYPE: str | None DEFAULT: None

client

Optional HTTP client (httpx.Client or _SyncTestClient).

TYPE: Client | _SyncTestClient | None DEFAULT: None

retry

Optional retry configuration for transient HTTP failures.

TYPE: HttpRetryConfig | None DEFAULT: None

RETURNS DESCRIPTION
HttpServerCapabilities

An HttpServerCapabilities with discovered values.

RAISES DESCRIPTION
ValueError

If base_url is None and client is None.

Source code in vgi_rpc/http/_client.py
def http_capabilities(
    base_url: str | None = None,
    *,
    prefix: str | None = None,
    client: httpx.Client | _SyncTestClient | None = None,
    retry: HttpRetryConfig | None = None,
) -> HttpServerCapabilities:
    """Discover server capabilities via ``OPTIONS {prefix}/health``.

    The capability headers (``VGI-Max-Request-Bytes``,
    ``VGI-Upload-URL-Support``, ``VGI-Max-Upload-Bytes``) are emitted on
    every response, but the dedicated discovery target is ``/health``
    because it is mandatory in every implementation and exempt from
    auth.  The server may include ``Cache-Control: max-age=N`` on the
    OPTIONS response; if so the returned ``HttpServerCapabilities``
    carries ``cache_expires_at`` so callers can refresh on expiry.

    Args:
        base_url: Base URL of the server (e.g. ``http://localhost:8000``).
            Required when *client* is ``None``.
        prefix: URL prefix matching the server's prefix.  ``None``
            auto-detects from ``_SyncTestClient``.
        client: Optional HTTP client (``httpx.Client`` or ``_SyncTestClient``).
        retry: Optional retry configuration for transient HTTP failures.

    Returns:
        An ``HttpServerCapabilities`` with discovered values.

    Raises:
        ValueError: If *base_url* is ``None`` and *client* is ``None``.

    """
    import time as _time

    own_client = client is None
    if client is None:
        if base_url is None:
            raise ValueError("base_url is required when client is not provided")
        client = httpx.Client(base_url=base_url, follow_redirects=True)
    if prefix is None:
        prefix = getattr(client, "prefix", "")

    try:
        url = f"{prefix}/health"
        resp = _options_with_retry(client, url, config=retry)
        headers = resp.headers

        max_req: int | None = None
        raw = headers.get(MAX_REQUEST_BYTES_HEADER) or headers.get(MAX_REQUEST_BYTES_HEADER.lower())
        if raw is not None:
            with contextlib.suppress(ValueError):
                max_req = int(raw)

        max_resp: int | None = None
        raw = headers.get(MAX_RESPONSE_BYTES_HEADER) or headers.get(MAX_RESPONSE_BYTES_HEADER.lower())
        if raw is not None:
            with contextlib.suppress(ValueError):
                max_resp = int(raw)

        max_ext_resp: int | None = None
        raw = headers.get(MAX_EXTERNALIZED_RESPONSE_BYTES_HEADER) or headers.get(
            MAX_EXTERNALIZED_RESPONSE_BYTES_HEADER.lower()
        )
        if raw is not None:
            with contextlib.suppress(ValueError):
                max_ext_resp = int(raw)

        ext_enabled_raw = headers.get(EXTERNALIZATION_ENABLED_HEADER) or headers.get(
            EXTERNALIZATION_ENABLED_HEADER.lower()
        )
        ext_enabled = ext_enabled_raw == "true" if ext_enabled_raw is not None else False

        upload_raw = headers.get(UPLOAD_URL_HEADER) or headers.get(UPLOAD_URL_HEADER.lower())
        upload_support = upload_raw == "true" if upload_raw is not None else False

        max_upload: int | None = None
        upload_bytes_raw = headers.get(MAX_UPLOAD_BYTES_HEADER) or headers.get(MAX_UPLOAD_BYTES_HEADER.lower())
        if upload_bytes_raw is not None:
            with contextlib.suppress(ValueError):
                max_upload = int(upload_bytes_raw)

        supported_raw = headers.get(SUPPORTED_ENCODINGS_HEADER) or headers.get(SUPPORTED_ENCODINGS_HEADER.lower())
        if supported_raw:
            parsed = tuple(parse_encoding_list(supported_raw))
            # Empty parse (e.g. server advertised codecs we don't recognise)
            # falls back to zstd-only — the historical behaviour.
            supported_encodings = parsed if parsed else (Encoding.ZSTD,)
        else:
            supported_encodings = (Encoding.ZSTD,)

        # Honour Cache-Control: max-age=N for refresh scheduling.
        cache_expires_at: float | None = None
        cc = headers.get("Cache-Control") or headers.get("cache-control")
        if cc:
            for token in cc.split(","):
                t = token.strip().lower()
                if t.startswith("max-age="):
                    with contextlib.suppress(ValueError):
                        cache_expires_at = _time.monotonic() + float(t[len("max-age=") :])
                    break

        sticky_enabled_raw = headers.get(STICKY_ENABLED_HEADER) or headers.get(STICKY_ENABLED_HEADER.lower())
        sticky_enabled = sticky_enabled_raw == "true" if sticky_enabled_raw is not None else False

        sticky_ttl: int | None = None
        sticky_ttl_raw = headers.get(STICKY_DEFAULT_TTL_HEADER) or headers.get(STICKY_DEFAULT_TTL_HEADER.lower())
        if sticky_ttl_raw is not None:
            with contextlib.suppress(ValueError):
                sticky_ttl = int(sticky_ttl_raw)

        sticky_echo_raw = headers.get(STICKY_ECHO_HEADERS_HEADER) or headers.get(STICKY_ECHO_HEADERS_HEADER.lower())
        sticky_echo: tuple[str, ...]
        if sticky_echo_raw:
            sticky_echo = tuple(name.strip() for name in sticky_echo_raw.split(",") if name.strip())
        else:
            sticky_echo = ()

        return HttpServerCapabilities(
            max_request_bytes=max_req,
            max_response_bytes=max_resp,
            max_externalized_response_bytes=max_ext_resp,
            externalization_enabled=ext_enabled,
            upload_url_support=upload_support,
            max_upload_bytes=max_upload,
            supported_encodings=supported_encodings,
            cache_expires_at=cache_expires_at,
            sticky_enabled=sticky_enabled,
            sticky_default_ttl=sticky_ttl,
            sticky_echo_headers=sticky_echo,
        )
    finally:
        if own_client:
            client.close()

request_upload_urls

request_upload_urls(
    base_url: str | None = None,
    *,
    count: int = 1,
    prefix: str | None = None,
    client: Client | _SyncTestClient | None = None,
    retry: HttpRetryConfig | None = None
) -> list[UploadUrl]

Request pre-signed upload URLs from the server's __upload_url__ endpoint.

The server must have been configured with an upload_url_provider in make_wsgi_app().

PARAMETER DESCRIPTION
base_url

Base URL of the server (e.g. http://localhost:8000). Required when client is None.

TYPE: str | None DEFAULT: None

count

Number of upload URLs to request (default 1, max 100).

TYPE: int DEFAULT: 1

prefix

URL prefix matching the server's prefix. None auto-detects from _SyncTestClient.

TYPE: str | None DEFAULT: None

client

Optional HTTP client (httpx.Client or _SyncTestClient).

TYPE: Client | _SyncTestClient | None DEFAULT: None

retry

Optional retry configuration for transient HTTP failures.

TYPE: HttpRetryConfig | None DEFAULT: None

RETURNS DESCRIPTION
list[UploadUrl]

A list of UploadUrl objects with pre-signed PUT and GET URLs.

RAISES DESCRIPTION
RpcError

If the server does not support upload URLs (404) or returns an error.

ValueError

If base_url is None and client is None.

Source code in vgi_rpc/http/_client.py
def request_upload_urls(
    base_url: str | None = None,
    *,
    count: int = 1,
    prefix: str | None = None,
    client: httpx.Client | _SyncTestClient | None = None,
    retry: HttpRetryConfig | None = None,
) -> list[UploadUrl]:
    """Request pre-signed upload URLs from the server's ``__upload_url__`` endpoint.

    The server must have been configured with an ``upload_url_provider``
    in ``make_wsgi_app()``.

    Args:
        base_url: Base URL of the server (e.g. ``http://localhost:8000``).
            Required when *client* is ``None``.
        count: Number of upload URLs to request (default 1, max 100).
        prefix: URL prefix matching the server's prefix.  ``None``
            auto-detects from ``_SyncTestClient``.
        client: Optional HTTP client (``httpx.Client`` or ``_SyncTestClient``).
        retry: Optional retry configuration for transient HTTP failures.

    Returns:
        A list of ``UploadUrl`` objects with pre-signed PUT and GET URLs.

    Raises:
        RpcError: If the server does not support upload URLs (404) or
            returns an error.
        ValueError: If *base_url* is ``None`` and *client* is ``None``.

    """
    own_client = client is None
    if client is None:
        if base_url is None:
            raise ValueError("base_url is required when client is not provided")
        client = httpx.Client(base_url=base_url, follow_redirects=True)
    if prefix is None:
        prefix = getattr(client, "prefix", "")

    try:
        # Build request IPC with standard wire protocol metadata
        req_buf = BytesIO()
        _write_request(req_buf, _UPLOAD_URL_METHOD, _UPLOAD_URL_PARAMS_SCHEMA, {"count": count})

        resp = _post_with_retry(
            client,
            f"{prefix}/__upload_url__/init",
            content=req_buf.getvalue(),
            headers={"Content-Type": _ARROW_CONTENT_TYPE},
            config=retry,
        )

        # Without an upload_url_provider the route doesn't exist and the
        # request falls through to _StreamInitResource → 404.
        if resp.status_code == HTTPStatus.NOT_FOUND:
            raise RpcError("NotSupported", "Server does not support upload URLs", "")

        reader = _open_response_stream(resp.content, resp.status_code)
        urls: list[UploadUrl] = []
        try:
            while True:
                try:
                    batch, custom_metadata = reader.read_next_batch_with_custom_metadata()
                except StopIteration:
                    break

                if _dispatch_log_or_error(batch, custom_metadata):
                    continue

                for i in range(batch.num_rows):
                    upload_url = batch.column("upload_url")[i].as_py()
                    download_url = batch.column("download_url")[i].as_py()
                    expires_at = batch.column("expires_at")[i].as_py()
                    urls.append(UploadUrl(upload_url=upload_url, download_url=download_url, expires_at=expires_at))
        except RpcError:
            _drain_stream(reader)
            raise
        _drain_stream(reader)
        return urls
    finally:
        if own_client:
            client.close()

Capabilities

HttpServerCapabilities dataclass

HttpServerCapabilities(
    max_request_bytes: int | None = None,
    max_response_bytes: int | None = None,
    max_externalized_response_bytes: int | None = None,
    externalization_enabled: bool = False,
    upload_url_support: bool = False,
    max_upload_bytes: int | None = None,
    supported_encodings: tuple[Encoding, ...] = (ZSTD,),
    cache_expires_at: float | None = None,
    sticky_enabled: bool = False,
    sticky_default_ttl: int | None = None,
    sticky_echo_headers: tuple[str, ...] = (),
)

Capabilities advertised by an HTTP RPC server.

Discovered via OPTIONS {prefix}/health (or any other route — the headers are emitted on every response). The server may include a Cache-Control: max-age=N header on the OPTIONS response; the client honours that and refreshes when cache_expires_at lapses.

ATTRIBUTE DESCRIPTION
max_request_bytes

Maximum request body size the server advertises, or None if the server does not advertise a limit. The server returns 413 Payload Too Large for inline bodies above this; clients should externalize via the upload-URL flow.

TYPE: int | None

max_response_bytes

HTTP body cap the server advertises for its own responses, or None if no body cap is configured. Conformance tests sizing oversized payloads should multiply this by a comfortable factor to provably overshoot.

TYPE: int | None

max_externalized_response_bytes

Cap on per-response externalised payload bytes, or None if no external cap is configured.

TYPE: int | None

externalization_enabled

True iff the server has a storage backend wired up. When False, externalisation cannot rescue an oversize response; conformance tests for the externalised strict-fail path should skip.

TYPE: bool

upload_url_support

Whether the server exposes __upload_url__/init for client-vended pointer-batch uploads.

TYPE: bool

max_upload_bytes

Maximum upload size the server advertises for client-vended URLs, or None if not advertised.

TYPE: int | None

supported_encodings

Content-encoding codecs the server can decompress on request bodies and re-encode for responses. Parsed from the VGI-Supported-Encodings response header; falls back to (Encoding.ZSTD,) when the header is missing — matches the pre-gzip server, which only ever accepted zstd.

TYPE: tuple[Encoding, ...]

cache_expires_at

Monotonic timestamp (time.monotonic()) at which this snapshot of the capabilities should be re-probed. None means no expiry hint was given.

TYPE: float | None

sticky_enabled class-attribute instance-attribute

sticky_enabled: bool = False

Whether the server has enable_sticky=True and supports VGI-Session.

sticky_default_ttl class-attribute instance-attribute

sticky_default_ttl: int | None = None

Default session TTL in seconds when open_session is called without an explicit TTL.

sticky_echo_headers class-attribute instance-attribute

sticky_echo_headers: tuple[str, ...] = ()

Header names the server tells the client to echo on every subsequent session request.

Parsed from the comma-separated VGI-Sticky-Echo-Headers capability header. Empty tuple when the server is sticky-enabled but has no echo-header config (the default), or when the server is non-sticky. Concrete values land on the _SessionView via captured VGI-Echo-<name> response headers on the session-opening response; this field exposes the names for introspection (LB configuration, cross-language client implementations).

Stream Session

HttpStreamSession

HttpStreamSession(
    client: Client | _SyncTestClient,
    url_prefix: str,
    method: str,
    state_bytes: bytes | None,
    output_schema: Schema,
    on_log: Callable[[Message], None] | None = None,
    *,
    external_config: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = FULL,
    pending_batches: list[AnnotatedBatch] | None = None,
    finished: bool = False,
    header: object | None = None,
    retry_config: HttpRetryConfig | None = None,
    compression_level: int | None = None
)

Client-side handle for a stream over HTTP (both producer and exchange patterns).

For producer streams, use __iter__() — yields batches from batched responses and follows continuation tokens transparently. For exchange streams, use exchange() — sends an input batch and receives an output batch.

Supports context manager protocol for convenience.

Initialize with HTTP client, method details, and initial state.

Source code in vgi_rpc/http/_client.py
def __init__(
    self,
    client: httpx.Client | _SyncTestClient,
    url_prefix: str,
    method: str,
    state_bytes: bytes | None,
    output_schema: pa.Schema,
    on_log: Callable[[Message], None] | None = None,
    *,
    external_config: ExternalLocationConfig | None = None,
    ipc_validation: IpcValidation = IpcValidation.FULL,
    pending_batches: list[AnnotatedBatch] | None = None,
    finished: bool = False,
    header: object | None = None,
    retry_config: HttpRetryConfig | None = None,
    compression_level: int | None = None,
) -> None:
    """Initialize with HTTP client, method details, and initial state."""
    self._client = client
    self._url_prefix = url_prefix
    self._method = method
    self._state_bytes = state_bytes
    self._output_schema = output_schema
    self._on_log = on_log
    self._external_config = external_config
    self._ipc_validation = ipc_validation
    self._pending_batches: list[AnnotatedBatch] = pending_batches or []
    self._finished = finished
    self._header = header
    self._retry_config = retry_config
    self._compression_level = compression_level
    self._capabilities: HttpServerCapabilities | None = None

header property

header: object | None

The stream header, or None if the stream has no header.

typed_header

typed_header(header_type: type[H]) -> H

Return the stream header narrowed to the expected type.

PARAMETER DESCRIPTION
header_type

The expected header dataclass type.

TYPE: type[H]

RETURNS DESCRIPTION
H

The header, typed as header_type.

RAISES DESCRIPTION
TypeError

If the header is None or not an instance of header_type.

Source code in vgi_rpc/http/_client.py
def typed_header[H: ArrowSerializableDataclass](self, header_type: type[H]) -> H:
    """Return the stream header narrowed to the expected type.

    Args:
        header_type: The expected header dataclass type.

    Returns:
        The header, typed as *header_type*.

    Raises:
        TypeError: If the header is ``None`` or not an instance of
            *header_type*.

    """
    if self._header is None:
        raise TypeError(f"Stream has no header (expected {header_type.__name__})")
    if not isinstance(self._header, header_type):
        raise TypeError(f"Header type mismatch: expected {header_type.__name__}, got {type(self._header).__name__}")
    return self._header

exchange

exchange(input_batch: AnnotatedBatch) -> AnnotatedBatch

Send an input batch and receive the output batch.

PARAMETER DESCRIPTION
input_batch

The input batch to send.

TYPE: AnnotatedBatch

RETURNS DESCRIPTION
AnnotatedBatch

The output batch from the server.

RAISES DESCRIPTION
RpcError

If the server reports an error or the stream has finished.

Source code in vgi_rpc/http/_client.py
def exchange(self, input_batch: AnnotatedBatch) -> AnnotatedBatch:
    """Send an input batch and receive the output batch.

    Args:
        input_batch: The input batch to send.

    Returns:
        The output batch from the server.

    Raises:
        RpcError: If the server reports an error or the stream has finished.

    """
    if self._state_bytes is None:
        raise RpcError("ProtocolError", "Stream has finished — no state token available", "")

    batch_to_write = input_batch.batch
    cm_to_write = input_batch.custom_metadata

    # Build the inline body first; auto-externalization (if needed)
    # then operates on the serialized bytes via the server-vended
    # upload-URL flow.  The state token is on the outer batch, so
    # _build_pointer_request_body preserves it on the pointer.
    req_buf = BytesIO()
    state_md = pa.KeyValueMetadata({STATE_KEY: self._state_bytes})
    merged = merge_metadata(cm_to_write, state_md)
    with ipc.new_stream(req_buf, batch_to_write.schema) as writer:
        writer.write_batch(batch_to_write, custom_metadata=merged)
    body = self._maybe_externalize_request(req_buf.getvalue())

    if wire_http_logger.isEnabledFor(logging.DEBUG):
        wire_http_logger.debug(
            "HTTP stream exchange: method=%s, input=%s",
            self._method,
            fmt_batch(batch_to_write),
        )
    # Exchange calls are NOT retried: the server's process() method may
    # have side effects, and a proxy 502 after server processing would
    # cause duplicate execution.  Only init/unary/continuation are retried.
    resp = self._client.post(
        f"{self._url_prefix}/{self._method}/exchange",
        content=self._prepare_body(body),
        headers=self._build_headers(),
    )
    if resp.status_code == HTTPStatus.REQUEST_ENTITY_TOO_LARGE:
        body = self._externalize_request_body(body)
        resp = self._client.post(
            f"{self._url_prefix}/{self._method}/exchange",
            content=self._prepare_body(body),
            headers=self._build_headers(),
        )
    if wire_http_logger.isEnabledFor(logging.DEBUG):
        wire_http_logger.debug(
            "HTTP stream exchange response: method=%s, status=%d, size=%d",
            self._method,
            resp.status_code,
            len(resp.content),
        )

    # Read response — log batches + data batch with state
    reader = _open_response_stream(resp.content, resp.status_code, self._ipc_validation)
    try:
        ab = _read_batch_with_log_check(reader, self._on_log, self._external_config)
    except RpcError:
        _drain_stream(reader)
        raise

    # Extract updated state from metadata
    if ab.custom_metadata is not None:
        new_state = ab.custom_metadata.get(STATE_KEY)
        if new_state is not None:
            self._state_bytes = new_state

    # Strip state token from user-visible metadata
    user_cm = strip_keys(ab.custom_metadata, STATE_KEY)

    _drain_stream(reader)
    return AnnotatedBatch(batch=ab.batch, custom_metadata=user_cm)

__iter__

__iter__() -> Iterator[AnnotatedBatch]

Iterate over output batches from a producer stream.

Yields pre-loaded batches from init, then follows continuation tokens.

Source code in vgi_rpc/http/_client.py
def __iter__(self) -> Iterator[AnnotatedBatch]:
    """Iterate over output batches from a producer stream.

    Yields pre-loaded batches from init, then follows continuation tokens.
    """
    # Yield pre-loaded batches from init response
    yield from self._pending_batches
    self._pending_batches.clear()

    if self._finished:
        return

    # Follow continuation tokens
    if self._state_bytes is None:
        return

    reader: ValidatedReader | None = None
    try:
        reader = self._send_continuation(self._state_bytes)
        while True:
            try:
                batch, custom_metadata = reader.read_next_batch_with_custom_metadata()
            except StopIteration:
                break

            # Check for continuation token (zero-row batch with STATE_KEY)
            if batch.num_rows == 0 and custom_metadata is not None:
                token = custom_metadata.get(STATE_KEY)
                if token is not None:
                    if not isinstance(token, bytes):
                        raise TypeError(f"Expected bytes for state token, got {type(token).__name__}")
                    _drain_stream(reader)
                    reader = self._send_continuation(token)
                    continue

            # Dispatch log/error batches
            if _dispatch_log_or_error(batch, custom_metadata, self._on_log):
                continue

            resolved_batch, resolved_cm = resolve_external_location(
                batch, custom_metadata, self._external_config, self._on_log, reader.ipc_validation
            )
            yield AnnotatedBatch(batch=resolved_batch, custom_metadata=resolved_cm)
    except RpcError:
        if reader is not None:
            _drain_stream(reader)
        raise

close

close() -> None

Close the session (no-op for HTTP — stateless).

Source code in vgi_rpc/http/_client.py
def close(self) -> None:
    """Close the session (no-op for HTTP — stateless)."""

cancel

cancel() -> None

Signal the server to discard stream state and stop processing.

Sends a POST {prefix}/{method}/exchange carrying vgi_rpc.cancel metadata alongside the current state token. The server invokes state.on_cancel(ctx) (if defined) and releases the state.

Idempotent and best-effort: network failures are swallowed. After cancel(), the session is marked finished; further exchange() or iteration raises RpcError.

Source code in vgi_rpc/http/_client.py
def cancel(self) -> None:
    """Signal the server to discard stream state and stop processing.

    Sends a ``POST {prefix}/{method}/exchange`` carrying ``vgi_rpc.cancel``
    metadata alongside the current state token. The server invokes
    ``state.on_cancel(ctx)`` (if defined) and releases the state.

    Idempotent and best-effort: network failures are swallowed. After
    ``cancel()``, the session is marked finished; further ``exchange()``
    or iteration raises ``RpcError``.
    """
    if self._finished or self._state_bytes is None:
        self._finished = True
        self._state_bytes = None
        return
    token = self._state_bytes
    self._finished = True
    self._state_bytes = None
    if wire_http_logger.isEnabledFor(logging.DEBUG):
        wire_http_logger.debug("HTTP stream cancel: method=%s", self._method)
    req_buf = BytesIO()
    cancel_md = pa.KeyValueMetadata({STATE_KEY: token, CANCEL_KEY: b"1"})
    with ipc.new_stream(req_buf, _EMPTY_SCHEMA) as writer:
        writer.write_batch(empty_batch(_EMPTY_SCHEMA), custom_metadata=cancel_md)
    try:
        resp = self._client.post(
            f"{self._url_prefix}/{self._method}/exchange",
            content=self._prepare_body(req_buf.getvalue()),
            headers=self._build_headers(),
        )
    except Exception:
        return
    with contextlib.suppress(Exception):
        reader = _open_response_stream(resp.content, resp.status_code, self._ipc_validation)
        _drain_stream(reader)

__enter__

__enter__() -> HttpStreamSession

Enter the context.

Source code in vgi_rpc/http/_client.py
def __enter__(self) -> HttpStreamSession:
    """Enter the context."""
    return self

__exit__

__exit__(
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: TracebackType | None,
) -> None

Exit the context.

Source code in vgi_rpc/http/_client.py
def __exit__(
    self,
    _exc_type: type[BaseException] | None,
    _exc_val: BaseException | None,
    _exc_tb: TracebackType | None,
) -> None:
    """Exit the context."""
    self.close()

Sticky Sessions

DrainHandle dataclass

DrainHandle(
    drain: Callable[[], None],
    shutdown: Callable[[], None],
    is_draining: Callable[[], bool],
)

Operator-facing handle for triggering graceful drain on a sticky-enabled WSGI app.

Returned by :func:drain_handle when called against an app built by :func:vgi_rpc.http.make_wsgi_app with enable_sticky=True. Provides the two operations operators need to wire up SIGTERM handlers, pre-fork worker-exit hooks (gunicorn worker_exit), or custom shutdown logic:

  • :meth:drain — flip the registry's drain flag so subsequent ctx.open_session calls raise :class:~vgi_rpc.rpc.ServerDrainingError. Existing-session calls continue to serve until TTL or explicit close.
  • :meth:shutdown — invoke state.close() on every live session and clear the registry. Use after the operator-controlled grace period.

Both methods are idempotent and thread-safe (they delegate to :class:_SessionRegistry's lock-guarded methods).

drain instance-attribute

drain: Callable[[], None]

Set the registry's drain flag; new ctx.open_session calls raise ServerDrainingError.

shutdown instance-attribute

shutdown: Callable[[], None]

Invoke state.close() on every live session and clear the registry.

is_draining instance-attribute

is_draining: Callable[[], bool]

Return whether drain() has been invoked.

drain_handle

drain_handle(
    app: App[Request, Response],
) -> DrainHandle | None

Return a :class:DrainHandle for app, or None if sticky is not enabled.

Inspects the Falcon app's middleware tuple to find the :class:_StickyMiddleware instance, then constructs closures over its registry. Returns None cleanly for non-sticky apps so operator code can branch with if (handle := drain_handle(app)) is not None: ....

Used by :func:vgi_rpc.http.serve_http for its SIGTERM wiring, and exposed publicly so operators running under gunicorn / uwsgi / their own WSGI launcher can wire equivalent shutdown hooks. See the spec at docs/sticky-sessions-spec.md for the pre-fork worker-exit recipe.

Source code in vgi_rpc/http/server/_sticky.py
def drain_handle(app: falcon.App[falcon.Request, falcon.Response]) -> DrainHandle | None:
    """Return a :class:`DrainHandle` for *app*, or ``None`` if sticky is not enabled.

    Inspects the Falcon app's middleware tuple to find the
    :class:`_StickyMiddleware` instance, then constructs closures over its
    registry. Returns ``None`` cleanly for non-sticky apps so operator code
    can branch with ``if (handle := drain_handle(app)) is not None: ...``.

    Used by :func:`vgi_rpc.http.serve_http` for its SIGTERM wiring, and
    exposed publicly so operators running under gunicorn / uwsgi / their
    own WSGI launcher can wire equivalent shutdown hooks. See the spec at
    ``docs/sticky-sessions-spec.md`` for the pre-fork worker-exit recipe.
    """
    # Falcon stores middleware as a tuple of three tuples:
    # (request-handlers, request-handlers-async, response-handlers).
    # Each handler is a bound method on a middleware instance; we walk
    # them all to find a _StickyMiddleware. Iteration order is stable
    # within Falcon's implementation but we don't rely on it — we just
    # find the first sticky instance and stop.
    middleware_groups: tuple[tuple[object, ...], ...] = getattr(app, "_middleware", ())
    for group in middleware_groups:
        for bound_method in group:
            owner = getattr(bound_method, "__self__", None)
            if isinstance(owner, _StickyMiddleware):
                return _build_drain_handle(owner._registry)
    return None

Fly.io quickstart

FLY_MACHINE_ID module-attribute

FLY_MACHINE_ID: str | None = get('FLY_MACHINE_ID')

The current Fly Machine ID, or None outside Fly.

Read once at module import. Fly Machines have stable IDs that persist across restarts of the same Machine, so caching at import time is safe.

auto_server_id

auto_server_id() -> str | None

Return FLY_MACHINE_ID if running on Fly, else None.

Use as RpcServer(server_id=auto_server_id()) to make the session token's stamped server identity match the Fly Machine ID. The framework's session-token format embeds server_id length-prefixed, so this works for any length of identifier — Fly Machine IDs are 14 hex characters today but the contract doesn't depend on that.

Returns None outside Fly so RpcServer falls back to its default random 12-char hex server_id.

Source code in vgi_rpc/http/fly.py
def auto_server_id() -> str | None:
    """Return ``FLY_MACHINE_ID`` if running on Fly, else ``None``.

    Use as ``RpcServer(server_id=auto_server_id())`` to make the session
    token's stamped server identity match the Fly Machine ID. The
    framework's session-token format embeds ``server_id`` length-prefixed,
    so this works for any length of identifier — Fly Machine IDs are
    14 hex characters today but the contract doesn't depend on that.

    Returns ``None`` outside Fly so RpcServer falls back to its
    default random 12-char hex ``server_id``.
    """
    return FLY_MACHINE_ID

fly_sticky_echo_headers

fly_sticky_echo_headers() -> dict[str, str] | None

Return {"fly-force-instance-id": FLY_MACHINE_ID} on Fly, else None.

Use as make_wsgi_app(..., sticky_echo_headers=fly_sticky_echo_headers()). When a method opens a session via ctx.open_session(...) on Fly, the server emits VGI-Echo-fly-force-instance-id: <machine-id> on the response; the client captures and replays it as fly-force-instance-id on every subsequent request in the same session, and fly-proxy routes directly to the owning Machine.

Returns None outside Fly so passing this through unchanged is a no-op in non-Fly environments — operators don't need a conditional.

Source code in vgi_rpc/http/fly.py
def fly_sticky_echo_headers() -> dict[str, str] | None:
    """Return ``{"fly-force-instance-id": FLY_MACHINE_ID}`` on Fly, else ``None``.

    Use as ``make_wsgi_app(..., sticky_echo_headers=fly_sticky_echo_headers())``.
    When a method opens a session via ``ctx.open_session(...)`` on Fly, the
    server emits ``VGI-Echo-fly-force-instance-id: <machine-id>`` on the
    response; the client captures and replays it as ``fly-force-instance-id``
    on every subsequent request in the same session, and fly-proxy routes
    directly to the owning Machine.

    Returns ``None`` outside Fly so passing this through unchanged is a
    no-op in non-Fly environments — operators don't need a conditional.
    """
    if FLY_MACHINE_ID is None:
        return None
    return {"fly-force-instance-id": FLY_MACHINE_ID}

Testing

make_sync_client

make_sync_client(
    server: RpcServer,
    *,
    prefix: str = "",
    token_key: bytes | None = None,
    max_response_bytes: int | None = None,
    max_externalized_response_bytes: int | None = None,
    max_request_bytes: int | None = None,
    max_stream_response_bytes: int | None = None,
    authenticate: (
        Callable[[Request], AuthContext] | None
    ) = None,
    default_headers: dict[str, str] | None = None,
    upload_url_provider: UploadUrlProvider | None = None,
    max_upload_bytes: int | None = None,
    otel_config: object | None = None,
    sentry_config: object | None = None,
    token_ttl: int = 3600,
    compression_level: int | None = 3,
    enable_not_found_page: bool = True,
    enable_landing_page: bool = True,
    enable_describe_page: bool = True,
    enable_health_endpoint: bool = True,
    repo_url: str | None = None,
    oauth_resource_metadata: (
        OAuthResourceMetadata | None
    ) = None,
    enable_sticky: bool = False,
    sticky_default_ttl: float = 300.0,
    sticky_echo_headers: Mapping[str, str] | None = None
) -> _SyncTestClient

Create a synchronous test client for an RpcServer.

Uses falcon.testing.TestClient internally — no real HTTP server needed.

PARAMETER DESCRIPTION
server

The RpcServer to test.

TYPE: RpcServer

prefix

URL prefix for RPC endpoints (default "" — root).

TYPE: str DEFAULT: ''

token_key

AEAD key for sealing stream state tokens (see make_wsgi_app for details).

TYPE: bytes | None DEFAULT: None

max_response_bytes

See make_wsgi_app.

TYPE: int | None DEFAULT: None

max_externalized_response_bytes

See make_wsgi_app.

TYPE: int | None DEFAULT: None

max_request_bytes

See make_wsgi_app.

TYPE: int | None DEFAULT: None

max_stream_response_bytes

Deprecated alias for max_response_bytes.

TYPE: int | None DEFAULT: None

authenticate

See make_wsgi_app.

TYPE: Callable[[Request], AuthContext] | None DEFAULT: None

default_headers

Headers merged into every request (e.g. auth tokens).

TYPE: dict[str, str] | None DEFAULT: None

upload_url_provider

See make_wsgi_app.

TYPE: UploadUrlProvider | None DEFAULT: None

max_upload_bytes

See make_wsgi_app.

TYPE: int | None DEFAULT: None

otel_config

See make_wsgi_app.

TYPE: object | None DEFAULT: None

sentry_config

See make_wsgi_app.

TYPE: object | None DEFAULT: None

token_ttl

See make_wsgi_app.

TYPE: int DEFAULT: 3600

compression_level

See make_wsgi_app.

TYPE: int | None DEFAULT: 3

enable_not_found_page

See make_wsgi_app.

TYPE: bool DEFAULT: True

enable_landing_page

See make_wsgi_app.

TYPE: bool DEFAULT: True

enable_describe_page

See make_wsgi_app.

TYPE: bool DEFAULT: True

enable_health_endpoint

See make_wsgi_app.

TYPE: bool DEFAULT: True

repo_url

See make_wsgi_app.

TYPE: str | None DEFAULT: None

oauth_resource_metadata

See make_wsgi_app.

TYPE: OAuthResourceMetadata | None DEFAULT: None

enable_sticky

See make_wsgi_app.

TYPE: bool DEFAULT: False

sticky_default_ttl

See make_wsgi_app.

TYPE: float DEFAULT: 300.0

sticky_echo_headers

See make_wsgi_app.

TYPE: Mapping[str, str] | None DEFAULT: None

RETURNS DESCRIPTION
_SyncTestClient

A sync client that can be passed to http_connect(client=...).

Source code in vgi_rpc/http/_testing.py
def make_sync_client(
    server: RpcServer,
    *,
    prefix: str = "",
    token_key: bytes | None = None,
    max_response_bytes: int | None = None,
    max_externalized_response_bytes: int | None = None,
    max_request_bytes: int | None = None,
    max_stream_response_bytes: int | None = None,
    authenticate: Callable[[falcon.Request], AuthContext] | None = None,
    default_headers: dict[str, str] | None = None,
    upload_url_provider: UploadUrlProvider | None = None,
    max_upload_bytes: int | None = None,
    otel_config: object | None = None,
    sentry_config: object | None = None,
    token_ttl: int = 3600,
    compression_level: int | None = 3,
    enable_not_found_page: bool = True,
    enable_landing_page: bool = True,
    enable_describe_page: bool = True,
    enable_health_endpoint: bool = True,
    repo_url: str | None = None,
    oauth_resource_metadata: OAuthResourceMetadata | None = None,
    enable_sticky: bool = False,
    sticky_default_ttl: float = 300.0,
    sticky_echo_headers: Mapping[str, str] | None = None,
) -> _SyncTestClient:
    """Create a synchronous test client for an RpcServer.

    Uses ``falcon.testing.TestClient`` internally — no real HTTP server needed.

    Args:
        server: The RpcServer to test.
        prefix: URL prefix for RPC endpoints (default ``""`` — root).
        token_key: AEAD key for sealing stream state tokens (see
            ``make_wsgi_app`` for details).
        max_response_bytes: See ``make_wsgi_app``.
        max_externalized_response_bytes: See ``make_wsgi_app``.
        max_request_bytes: See ``make_wsgi_app``.
        max_stream_response_bytes: **Deprecated** alias for
            ``max_response_bytes``.
        authenticate: See ``make_wsgi_app``.
        default_headers: Headers merged into every request (e.g. auth tokens).
        upload_url_provider: See ``make_wsgi_app``.
        max_upload_bytes: See ``make_wsgi_app``.
        otel_config: See ``make_wsgi_app``.
        sentry_config: See ``make_wsgi_app``.
        token_ttl: See ``make_wsgi_app``.
        compression_level: See ``make_wsgi_app``.
        enable_not_found_page: See ``make_wsgi_app``.
        enable_landing_page: See ``make_wsgi_app``.
        enable_describe_page: See ``make_wsgi_app``.
        enable_health_endpoint: See ``make_wsgi_app``.
        repo_url: See ``make_wsgi_app``.
        oauth_resource_metadata: See ``make_wsgi_app``.
        enable_sticky: See ``make_wsgi_app``.
        sticky_default_ttl: See ``make_wsgi_app``.
        sticky_echo_headers: See ``make_wsgi_app``.

    Returns:
        A sync client that can be passed to ``http_connect(client=...)``.

    """
    app = make_wsgi_app(
        server,
        prefix=prefix,
        token_key=token_key,
        max_response_bytes=max_response_bytes,
        max_externalized_response_bytes=max_externalized_response_bytes,
        max_stream_response_bytes=max_stream_response_bytes,
        max_request_bytes=max_request_bytes,
        authenticate=authenticate,
        upload_url_provider=upload_url_provider,
        max_upload_bytes=max_upload_bytes,
        otel_config=otel_config,
        sentry_config=sentry_config,
        token_ttl=token_ttl,
        compression_level=compression_level,
        enable_not_found_page=enable_not_found_page,
        enable_landing_page=enable_landing_page,
        enable_describe_page=enable_describe_page,
        enable_health_endpoint=enable_health_endpoint,
        repo_url=repo_url,
        oauth_resource_metadata=oauth_resource_metadata,
        enable_sticky=enable_sticky,
        sticky_default_ttl=sticky_default_ttl,
        sticky_echo_headers=sticky_echo_headers,
    )
    return _SyncTestClient(app, default_headers=default_headers, prefix=prefix)

Header Constants

MAX_REQUEST_BYTES_HEADER module-attribute

MAX_REQUEST_BYTES_HEADER = 'VGI-Max-Request-Bytes'

MAX_UPLOAD_BYTES_HEADER module-attribute

MAX_UPLOAD_BYTES_HEADER = 'VGI-Max-Upload-Bytes'

UPLOAD_URL_HEADER module-attribute

UPLOAD_URL_HEADER = 'VGI-Upload-URL-Support'