Skip to content

QUIC substreams: independent streams for audio, video, and input#4864

Open
andrewachen wants to merge 8 commits intoXpra-org:masterfrom
andrewachen:fix/quic-substreams-master
Open

QUIC substreams: independent streams for audio, video, and input#4864
andrewachen wants to merge 8 commits intoXpra-org:masterfrom
andrewachen:fix/quic-substreams-master

Conversation

@andrewachen
Copy link
Copy Markdown
Contributor

@andrewachen andrewachen commented Apr 10, 2026

Summary

Route audio, video, and input to independent QUIC streams to prevent
head-of-line blocking, replacing the broken push-promise approach that was
disabled in May 2023 due to NoAvailablePushIDError (#3850).

Ported from my v6.4.x daily-driver branch where this has been running for several days as a daily driver via module overrides.

Cellular benchmark results (no audio pipe bypass — substreams + priority only):

Sound network jitter TCP QUIC single QUIC multi
stdev (run 1) 80ms 109ms 20ms
stdev (run 2) 98ms 157ms 17ms
max (run 1) 817ms 1249ms 193ms
max (run 2) 708ms 1906ms 147ms

Single-stream QUIC is worse than TCP on cellular — H3's single stream has
stricter in-order delivery than TCP's kernel-level buffering. Multi-stream
eliminates this entirely: 4-5x better than TCP, 6-8x better than QUIC-single.

On WiFi/LAN with sufficient bandwidth, all modes perform equivalently since the
congestion window never fills.

  • Raw QUIC streams replace push promises — each substream is a raw QUIC
    bidirectional stream identified by a "xpra:<type>\n" header prefix,
    allocated via _quic.get_next_available_stream_id() and send_stream_data().
    Push promises were fundamentally limited by the H3 push ID pool; raw streams
    have no such limit.
  • Per-stream parse state isolationParseState class (with __slots__)
    holds independent reassembly state per stream in the parse thread, preventing
    interleaving corruption when draw and sound data arrive simultaneously on
    different streams.
  • Stream priority reordering — aioquic iterates _streams in dict insertion
    order with no priority mechanism. When video bandwidth spikes (e.g. window
    resize, 2→100 Mbps), the video stream consumes the entire congestion window,
    starving audio and input. _prioritize_streams() reorders the dict after each
    allocation: control streams first, then input (key, pointer), audio (sound),
    and bulk data (draw) last.
  • Client→server substreams — keystrokes and pointer events bypass the main
    WebSocket write queue via dedicated QUIC streams, preventing input latency
    spikes when the queue is congested with clipboard or ack packets.
  • TLS error surfacing — override datagram_received() to catch TLS/OpenSSL
    exceptions and resolve the connected waiter immediately instead of hanging
    until timeout with no error message. Both fast-open and non-fast-open paths
    use asyncio.wait_for() with the connect timeout.
  • certifi CA fallback — for frozen Windows builds where the OS cert store
    isn't accessible to pyOpenSSL, falls back to certifi.where() and checks
    relative to the executable directory.
  • Zombie connection cleanupConnectionTerminated event handler,
    _close_dead_connection() on write failure, and 60s read timeout loop ensure
    the xpra disconnect chain fires reliably when a peer dies.
  • UDP buffer sizing — 4 MiB send/receive buffers reduce packet drops during
    Python GIL pauses.
  • HOL blocking benchmarktests/scripts/test_quic_hol.py connects as a
    real QUIC client and measures inter-arrival jitter with and without substreams
    enabled. Compares TCP, QUIC-single, and QUIC-multi in sequential or parallel
    (--parallel) mode.

Why raw QUIC streams instead of H3 push promises

The existing substream stub used send_push_promise(), which maps to HTTP/3
server push. This has two problems: (1) the push ID space is limited and
exhausted quickly, causing NoAvailablePushIDError after a few streams, and
(2) push promises are semantically wrong — they're meant for speculative
resource delivery, not bidirectional data channels. Raw QUIC streams have none
of these limitations and are what the QUIC transport was designed for.

Why WS framing is stripped in do_write() instead of conditionally skipped

The WebSocket frame header (make_wsframe_header) is added by the format thread,
but the stream routing decision happens on the asyncio thread. A conditional
"don't add framing for substreams" check in the format thread would race with
the stream allocation on the asyncio thread. Instead, framing is always added
(no cross-thread coordination needed), and _strip_ws_header() removes it in
do_write() for substream writes, which is already on the asyncio thread.

Why accessing aioquic private attributes

_prioritize_streams() reorders _quic._streams, and the TLS error path reads
_connected_waiter and _close_event. aioquic provides no public API for stream
scheduling or error reporting during handshake. aioquic itself uses these
attributes the same way internally. The access is wrapped in try/except so a
future aioquic refactor degrades gracefully rather than crashing.

Configurable via environment variables:

  • XPRA_QUIC_SUBSTREAM_PACKET_TYPES — server→client (default: sound,draw)
  • XPRA_QUIC_CLIENT_SUBSTREAM_PACKET_TYPES — client→server (default: key-,pointer)

Addresses #3854. Partially addresses #4202 (stream priority is a first step
toward congestion-aware scheduling), #4663 (zombie cleanup and TLS error
surfacing fix silent connection drops), #3850 (TLS errors now surface
immediately instead of idle timeout).

🤖 Generated with Claude Code

Sponsored-By: Netflix

andrewachen and others added 3 commits April 10, 2026 13:55
The substream feature was disabled in May 2023 because HTTP/3 push
promises exhaust the limited push ID pool. Replace with raw QUIC
streams via _quic.get_next_available_stream_id() and send_stream_data().

Each substream is identified by a type prefix ("xpra:<type>\n"). Both
server and client intercept StreamDataReceived before H3. Stream
allocation is deferred to the asyncio loop. WS frame headers are
always added by the format thread and stripped in do_write() for
substreams, eliminating a cross-thread race condition.

Server: allocate substreams, register with listener, route writes.
  Write failure detection for zombie connections and ConnectionTerminated
  cleanup handler.
Client: detect server-initiated substreams, parse type prefix,
  route to put_raw_substream_data.
Listener: pre-H3 interception, substream registration, zombie cleanup.

Audio pipe bypass split to separate PR.

Enabled by default for sound and draw packet types.

Closes Xpra-org#3854 (substreams part)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sponsored-By: Netflix
ParseState class (with __slots__) holds per-stream reassembly state in
the parse thread. Queue entries are tagged (stream_id, data) tuples.
Each QUIC stream gets independent header/payload/raw_packets state,
preventing interleaving corruption when draw and sound data arrive on
different streams.

Connection: _raw_read_cb callback delivers substream data to
substream_read_queue_put(), bypassing WebSocket framing. Read timeout
(60s) and _close_dead_connection() for zombie cleanup.

WebSocket protocol: wires _raw_read_cb to substream_read_queue_put
during init. WS framing always added by format thread (no race-prone
is_substream_packet check removed).

Audio pipe bypass split to separate PR.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sponsored-By: Netflix
Substreams require quic.substreams in the hello exchange. Server starts
disabled and enables in hello_oked() when the client advertises support.

certifi CA bundle used as fallback when no ssl-ca-certs is specified,
fixing QUIC cert verification on Windows cx_Freeze builds.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sponsored-By: Netflix
@andrewachen andrewachen force-pushed the fix/quic-substreams-master branch from 1c96991 to c5f0493 Compare April 10, 2026 22:54
@andrewachen andrewachen marked this pull request as ready for review April 10, 2026 23:04
@andrewachen andrewachen force-pushed the fix/quic-substreams-master branch from 2efa21a to c5f0493 Compare April 10, 2026 23:11
andrewachen and others added 5 commits April 10, 2026 22:57
Connects to a live xpra server via QUIC and measures inter-arrival
jitter of transport-level data chunks. Compares three modes:
  1. TCP (baseline)
  2. QUIC single-stream (no quic.substreams capability)
  3. QUIC multi-stream (with quic.substreams capability)

Supports sequential and parallel (--parallel) modes for A/B comparison,
with optional clock-boundary synchronization (--sync) for simultaneous
runs. Reports per-packet-type inter-arrival statistics (mean, p50, p95,
p99, max) and sound network jitter (arrival_diff - send_diff).

Draws are acked in all phases (pre-timing and timing) to prevent the
server's damage_ack_pending backlog from throttling output. For
multi-stream mode, a separate substream parser (_drain_substreams)
runs alongside the main parse thread — substream data goes to a
thread-safe buffer to avoid interleaving corruption in the single-state
parser. When running against a server with substream support but without
full client-side code, patches WebSocketClient with substream receive
at runtime.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sponsored-By: Netflix
When connecting via QUIC with a bad trust store or certificate hostname
mismatch, aioquic's TLS verification exception escapes datagram_received
and is swallowed by asyncio's callback handler. The connection then hangs
until timeout with no indication of the actual TLS error.

Fix: override datagram_received in WebSocketClient to catch TLS/OpenSSL
exceptions, store them on _tls_error, and resolve the connected waiter
with the actual error. Both fast-open and non-fast-open paths now use
asyncio.wait_for(wait_connected(), timeout) so TLS errors surface
immediately. Errors are formatted as user-friendly messages with
appropriate SSL exit codes matching the TCP+SSL path.

Fast-open ordering: connect(transmit=False) -> open() -> wait_connected(),
because open()'s transmit() call is what sends the queued ClientHello.
Non-fast-open: connect() -> wait_connected() -> open().

Note: this accesses aioquic private attributes (_connected_waiter,
_close_event) because there is no public API to resolve the handshake
waiter with an error or to read the QUIC close event. aioquic itself
uses _connected_waiter the same way in _process_events(). If aioquic
adds a public error reporting API in the future, this should be updated.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sponsored-By: Netflix
Keystrokes and pointer events now bypass the main WebSocket write queue
by using separate QUIC streams, preventing input latency spikes when
the queue is congested with clipboard, file transfer, or ack packets.

Refactors substream send logic (allocation, write dispatch, WS header
stripping) from ServerWebSocketConnection into the XpraQuicConnection
base class so both server and client connections can use it.

Client: configures key-/pointer prefixes via CLIENT_SUBSTREAM_PACKET_TYPES
env var, pre-allocates streams eagerly after server hello to avoid
first-keystroke delay. Enabled when server advertises quic.substreams.

Server: HttpServerProtocol detects client-initiated bidirectional QUIC
streams by parsing the "xpra:<type>\n" header prefix, then registers
them for direct routing that bypasses H3.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sponsored-By: Netflix
aioquic iterates its _streams dict in insertion order with no priority
mechanism. When video bandwidth spikes during window resize (2->100 Mbps),
the video stream consumes the entire congestion window, starving audio.
This was initially suspected to be GIL contention from pycuda, but
profiling the pycuda C source showed that memcpy/synchronize operations
already release the GIL -- the starvation is purely at the QUIC layer.

Add _prioritize_streams() to XpraQuicConnection that reorders the
aioquic _streams dict after each substream allocation. Priority order:
  1. Main/control streams (always first)
  2. Input streams (key, pointer)
  3. Audio (sound, webcam)
  4. Bulk data (draw)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sponsored-By: Netflix
Larger send/receive buffers (4 MiB default) reduce packet drops during
Python GIL pauses when video bandwidth spikes. Configurable via
XPRA_UDP_BUFFER_SIZE environment variable.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sponsored-By: Netflix
@andrewachen andrewachen force-pushed the fix/quic-substreams-master branch from c5f0493 to 4103962 Compare April 11, 2026 05:58
@totaam
Copy link
Copy Markdown
Collaborator

totaam commented Apr 11, 2026

I'm going to need time to review this one as this is quite involved.

I was actually in the process of doing some work on QUIC: #4306 and #3854 - I added 2439782 as a stepping stone towards handling audio using datagrams rather than separate streams.

@andrewachen
Copy link
Copy Markdown
Contributor Author

No worries — this ended up being more involved than I expected to get good performance on cellular.

Makes sense on datagrams for audio — the stream infrastructure in this PR is still needed for draw and input prioritization which require reliable, ordered delivery.

One thing worth considering on the datagrams-for-audio tradeoff: we also have an adaptive jitter buffer with WSOLA time-stretching (libsonic/SoundTouch) that absorbs retransmit delays gracefully — it slightly speeds up or slows down playback (±2.5%) to smooth jitter instead of underrunning. With that in place, stream retransmit delay becomes a non-issue for audio, while datagram loss produces gaps that can't be recovered without PLC. On LAN it doesn't matter, but on WiFi/cellular where the congestion window fills and datagrams get dropped, streams + tempo may actually produce better audio than datagrams + silence gaps.

Should I open a PR for the time-stretching? For my use case, slightly stretched audio is less distracting than dropouts.

@totaam
Copy link
Copy Markdown
Collaborator

totaam commented Apr 12, 2026

we also have an adaptive jitter buffer with WSOLA time-stretching (libsonic/SoundTouch) that absorbs retransmit delays gracefully

That's exactly what I've always wanted to implement but couldn't figure out how with gstreamer.

Should I open a PR for the time-stretching?

Oh! Yes please.

For my use case, slightly stretched audio is less distracting than dropouts.

I think that this is true for most use-cases. And will definitely help keep audio running smoothly with low latency datagrams.

Copy link
Copy Markdown
Collaborator

@totaam totaam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've merged some minor parts of this huge change: f902d7c + 302d256


See also #4873 which may interfere with packet re-assembly.

FYI: I was actually thinking of going a different route with quic substreams and perhaps using a dedicated socket parser, without transport layer compression, without "raw-packets" support, without a lot of the cruft that has accumulated in there...
But maybe this is a better approach, the ParseState abstraction looks like it would be worth having.


0225911 looked worthwhile so I tried to fix the merge conflicts, but it doesn't work for me and I still hit the timeout with a self-signed cert:

2026-04-22 23:09:26,890 [3b3501892c6fc0b1] Error: 298, reason: self-signed certificate, frame_type: 6
2026-04-22 23:09:28,769 created unix domain sockets:
2026-04-22 23:09:28,769  '/run/user/1000/xpra/clients/fedora-251604'

Warning: failed to connect:
 failed to connect: self-signed certificate, connection timed out

What am I doing wrong?

Here is the patch I was using:

diff --git a/xpra/net/quic/client.py b/xpra/net/quic/client.py
index ac7e8f453a..093909dec8 100644
--- a/xpra/net/quic/client.py
+++ b/xpra/net/quic/client.py
@@ -1,10 +1,12 @@
 # This file is part of Xpra.
 # Copyright (C) 2022 Antoine Martin <antoine@xpra.org>
+# Copyright (C) 2026 Netflix, Inc.
 # Xpra is released under the terms of the GNU GPL v2, or, at your option, any
 # later version. See the file COPYING for details.
 
 import re
 import os
+import asyncio
 import socket
 import ipaddress
 from queue import SimpleQueue
@@ -44,6 +46,29 @@ log = Logger("quic")
 
 HttpConnection = H0Connection | H3Connection
 
+
+def format_tls_error(exc: Exception) -> tuple[int, str]:
+    """Map a TLS/OpenSSL exception to (ExitCode, user-friendly message)."""
+    try:
+        from OpenSSL.crypto import Error as OpenSSLError
+    except ImportError:
+        OpenSSLError = None
+    if OpenSSLError and isinstance(exc, OpenSSLError):
+        # OpenSSL errors are lists of (lib, func, reason) tuples
+        reasons = " ".join(r[2] for r in exc.args[0] if r[2]).strip()
+        if "no such file" in reasons:
+            return ExitCode.SSL_FAILURE, f"SSL CA certificate file not found: {reasons}"
+        if "hostname mismatch" in reasons:
+            return ExitCode.SSL_CERTIFICATE_VERIFY_FAILURE, f"SSL certificate hostname mismatch: {reasons}"
+        if "certificate verify failed" in reasons:
+            return ExitCode.SSL_CERTIFICATE_VERIFY_FAILURE, f"SSL certificate verify failed: {reasons}"
+        return ExitCode.SSL_FAILURE, f"SSL error: {reasons}"
+    msg = str(exc)
+    if not msg or msg == str(type(exc)):
+        return ExitCode.SSL_FAILURE, "SSL handshake failed"
+    return ExitCode.SSL_FAILURE, f"SSL handshake failed: {msg}"
+
+
 IPV6 = socket.has_ipv6 and envbool("XPRA_IPV6", True)
 PREFER_IPV6 = IPV6 and envbool("XPRA_PREFER_IPV6", POSIX)
 HOSTS_PREFER_IPV4 = os.environ.get("XPRA_HOSTS_PREFER_IPV4", "localhost,127.0.0.1").split(",")
@@ -189,13 +214,25 @@ class WebSocketClient(QuicConnectionProtocol):
     def __init__(self, *args, **kwargs):
         super().__init__(*args, **kwargs)
         self._http: HttpConnection | None = None
-        self._push_types: dict[str, int] = {}
         self._websockets: dict[int, ClientWebSocketConnection] = {}
+        self._tls_error: Exception | None = None
         if self._quic.configuration.alpn_protocols[0].startswith("hq-"):
             self._http = H0Connection(self._quic)
         else:
             self._http = H3Connection(self._quic)
 
+    def datagram_received(self, data, addr) -> None:
+        try:
+            super().datagram_received(data, addr)
+        except Exception as e:
+            log("datagram_received TLS error", exc_info=True)
+            self._tls_error = e
+            # resolve the connected waiter so we don't hang until timeout
+            if self._connected_waiter is not None:
+                waiter = self._connected_waiter
+                self._connected_waiter = None
+                waiter.set_exception(e)
+
     def open(self, host: str, port: int, path: str) -> ClientWebSocketConnection:
         log(f"open({host}, {port}, {path})")
         stream_id = self._quic.get_next_available_stream_id()
@@ -303,6 +340,31 @@ def _make_quic_configuration(ssl_cert: str, ssl_key: str, ssl_key_password: str,
         max_datagram_frame_size=MAX_DATAGRAM_FRAME_SIZE,
     )
     configuration.verify_mode = parse_ssl_verify_mode(ssl_server_verify_mode)
+    if not ssl_ca_certs or ssl_ca_certs == "default":
+        # "default" means "use OS cert store" — handled by ssl.load_default_certs()
+        # for TCP+SSL, but aioquic uses pyOpenSSL which has no equivalent.
+        # Find the CA bundle ourselves instead.
+        # certifi.where() may return a stale build-time path in cx_Freeze frozen
+        # builds, so also check relative to the executable.
+        ssl_ca_certs = ""
+        ca_candidates = []
+        try:
+            import certifi
+            ca_candidates.append(certifi.where())
+        except ImportError:
+            pass
+        # frozen build: cacert.pem is next to the executable in lib/certifi/
+        import sys
+        exe_dir = os.path.dirname(getattr(sys, "executable", ""))
+        if exe_dir:
+            ca_candidates.append(os.path.join(exe_dir, "lib", "certifi", "cacert.pem"))
+        for ca_path in ca_candidates:
+            if ca_path and os.path.exists(ca_path):
+                ssl_ca_certs = ca_path
+                log(f"using CA bundle: {ssl_ca_certs}")
+                break
+        else:
+            log("no CA bundle found, certificate verification may fail")
     if ssl_ca_certs:
         configuration.load_verify_locations(ssl_ca_certs)
     if ssl_cert:
@@ -338,42 +400,49 @@ def _quic_connect(create_protocol, host: str, port: int, path: str, fast_open: b
         addr = addr_info[4]  # ie: ('192.168.0.10', 10000)
         log(f"connecting from {pretty_socket(local_addr)} to {pretty_socket(addr)} with {fast_open=}")
         if fast_open:
+            # fast-open: queue handshake, then open() triggers transmit
+            # sending ClientHello + websocket headers together
             protocol.connect(addr, transmit=False)
+            conn = protocol.open(host, port, path)
         else:
             protocol.connect(addr)
+        from xpra.scripts.picker import CONNECT_TIMEOUT
         log("awaiting connected state")
         try:
-            if not fast_open:
-                await protocol.wait_connected()
-            else:
-                from xpra.scripts.picker import CONNECT_TIMEOUT
-                tl.call_later(CONNECT_TIMEOUT, verify_connected, protocol)
-            log(f"{protocol}.open({host!r}, {port}, {path!r})")
-            conn = protocol.open(host, port, path)
-            log(f"quic connection {conn}")
-            return conn
+            # check for TLS errors that arrived before we started waiting
+            if protocol._tls_error:
+                raise protocol._tls_error
+            await asyncio.wait_for(protocol.wait_connected(), timeout=CONNECT_TIMEOUT)
+        except asyncio.TimeoutError:
+            # check if a TLS error was captured before the timeout
+            if protocol._tls_error:
+                exit_code, msg = format_tls_error(protocol._tls_error)
+                raise InitExit(exit_code, msg) from None
+            raise InitExit(ExitCode.CONNECTION_FAILED, "connection timed out") from None
         except Exception as e:
             log("connect()", exc_info=True)
-            # try to get a more meaningful exception message:
-            einfo = str(e)
-            if not einfo:
-                quic_conn = getattr(protocol, "_quic", None)
-                if quic_conn:
-                    close_event = getattr(quic_conn, "_close_event", None)
-                    log(f"{close_event=}, {dir(close_event)}")
-                    if close_event:
-                        err = close_event.error_code
-                        msg = close_event.reason_phrase
-                        if err & QuicErrorCode.CRYPTO_ERROR:
-                            raise InitExit(ExitCode.CONNECTION_FAILED, msg)
-                        # if (err & 0xFF)==QuicErrorCode.CONNECTION_REFUSED:
-                        #    raise InitExit(ExitCode.CONNECTION_FAILED, msg)
-                        raise RuntimeError(close_event.reason_phrase) from None
-            raise RuntimeError(str(e)) from None
-
-    # protocol.close()
-    # await protocol.wait_closed()
-    # transport.close()
+            # prefer the TLS error captured by datagram_received
+            if protocol._tls_error:
+                exit_code, msg = format_tls_error(protocol._tls_error)
+                raise InitExit(exit_code, msg) from None
+            # check for server-initiated CRYPTO_ERROR in the close event
+            close_event = getattr(protocol._quic, "_close_event", None)
+            if close_event:
+                err = close_event.error_code
+                msg = close_event.reason_phrase
+                if err & QuicErrorCode.CRYPTO_ERROR:
+                    raise InitExit(ExitCode.SSL_CERTIFICATE_VERIFY_FAILURE,
+                                   msg or "SSL certificate verification rejected by server") from None
+                if msg:
+                    raise InitExit(ExitCode.CONNECTION_FAILED, msg) from None
+            exit_code, msg = format_tls_error(e)
+            raise InitExit(exit_code, msg) from None
+        if not fast_open:
+            log(f"{protocol}.open({host!r}, {port}, {path!r})")
+            conn = protocol.open(host, port, path)
+        log(f"websocket connection {conn}")
+        return conn
+
     if len(addresses) == 1:
         return tl.sync(connect, addresses[0])
 
@@ -401,10 +470,3 @@ def quic_connect(host: str, port: int, path: str, fast_open: bool,
         return client_class(QuicConnection(configuration=configuration))
 
     return _quic_connect(create_protocol, host, port, path, fast_open)
-
-
-def verify_connected(protocol) -> None:
-    # this is only useful for debugging,
-    # as we have no way to send a message to the main thread,
-    # I guess it may still show up in some debug log?
-    log("verify_connect(%s) connected=%s", protocol, protocol._connected)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants