diff --git a/dshell/plugins/quic/_helpers.py b/dshell/plugins/quic/_helpers.py new file mode 100644 index 0000000..3f28cd0 --- /dev/null +++ b/dshell/plugins/quic/_helpers.py @@ -0,0 +1,83 @@ +# SPDX-License-Identifier: BSD-3-Clause +""" +Minimal QUIC long-header helpers (pure-Python, CI-friendly). + +We only parse the QUIC *long header* preamble: +- header form (always "long" here) +- version (uint32) +- DCID (Destination Connection ID) +- SCID (Source Connection ID) + +RFC 9000 §17.2 layout (prefix): +[first byte][version(4)][dcid_len(1)][dcid][scid_len(1)][scid]... +We intentionally stop after SCID; fields after that vary by packet type. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class QuicLongHeaderMeta: + header_form: str # "long" + version: int # 32-bit version (0 for version negotiation) + dcid: bytes # Destination Connection ID + scid: bytes # Source Connection ID + + +def is_quic_long_header(data: bytes) -> bool: + """ + True if the buffer looks like a QUIC *long header* (MSB set) and is long enough + for [first byte][version][dcid_len] (>= 6 bytes). + """ + return len(data) >= 6 and (data[0] & 0x80) == 0x80 + + +def parse_quic_long_header(data: bytes) -> Optional[QuicLongHeaderMeta]: + """ + Parse the QUIC long header preamble and return QuicLongHeaderMeta, or None if invalid. + Safe to call on arbitrary UDP payloads. + """ + try: + if not is_quic_long_header(data): + return None + + # version: 4 bytes after first byte + version = int.from_bytes(data[1:5], "big") + + p = 5 + # DCID length + DCID + dcid_len = data[p] + p += 1 + if p + dcid_len > len(data): + return None + dcid = data[p : p + dcid_len] + p += dcid_len + + # SCID length + SCID + if p >= len(data): + return None + scid_len = data[p] + p += 1 + if p + scid_len > len(data): + return None + scid = data[p : p + scid_len] + + return QuicLongHeaderMeta( + header_form="long", + version=version, + dcid=dcid, + scid=scid, + ) + except Exception: + # Be defensive: never raise from helpers + return None + + +__all__ = [ + "QuicLongHeaderMeta", + "is_quic_long_header", + "parse_quic_long_header", +] diff --git a/dshell/plugins/quic/quic.py b/dshell/plugins/quic/quic.py new file mode 100644 index 0000000..d924128 --- /dev/null +++ b/dshell/plugins/quic/quic.py @@ -0,0 +1,129 @@ +# SPDX-License-Identifier: BSD-3-Clause +""" +QUIC (RFC 9000) decoder (long-header metadata) + +Parses QUIC *long header* preamble from UDP payloads to extract: +- version (uint32) +- DCID (Destination Connection ID) +- SCID (Source Connection ID) + +Defaults to UDP/443 but --ports can be used to add/override. +Never throws on malformed inputs; logs at debug and skips. + +Example (table): + decode -r traffic.pcap -p quic --ports 443,8443 + +Columns: + ts, src, sport, dst, dport, version, dcid, scid, cid_len +""" + +from __future__ import annotations + +# Framework import (kept only here so helpers remain importable without Dshell deps) +from dshell.decoder import Decoder # type: ignore + +# Import pure-Python helpers (no Dshell/pcap deps) +from ._helpers import ( + QuicLongHeaderMeta, + is_quic_long_header as _is_quic_long_header, + parse_quic_long_header as _parse_quic_long_header, +) + + +class quic(Decoder): # Dshell convention: class name == plugin name + """ + QUIC long-header metadata extractor. + Uses pure-Python helpers for safe parsing. + """ + + def __init__(self): + # Human-facing metadata shown by `decode -p quic -h` + Decoder.__init__( + self, + name="quic", + description="Extract QUIC long-header metadata (version, DCID, SCID)", + author="Akindotcome", + filter="udp", # BPF-level; we'll further filter by port(s) + ) + # Default port(s); allow override via --ports + self.ports = {443} + + # Output columns for table/CSV reporters + self.columns = ( + "ts", + "src", + "sport", + "dst", + "dport", + "version", + "dcid", + "scid", + "cid_len", + ) + + def options(self, opts): + """ + Add CLI options: --ports 443,8443 + """ + opts.add_option( + "ports", + "comma-separated UDP ports to consider as QUIC", + default="443", + ) + + def preparse(self, opts): + """ + Process CLI options before decoding starts. + """ + ports = set() + for tok in str(opts.ports).split(","): + tok = tok.strip() + if not tok: + continue + try: + ports.add(int(tok)) + except ValueError: + self.warn(f"Invalid port '{tok}', skipping") + if ports: + self.ports = ports + + def packet(self, pkt): + """ + Per-packet hook. `pkt` is Dshell’s packet object with helpers like: + - pkt.time + - pkt.ip, pkt.udp + - pkt.src, pkt.dst, pkt.sport, pkt.dport + - pkt.data (payload bytes) + If your Dshell version differs, map these accordingly. + """ + try: + # Only targeted UDP ports + if pkt.dport not in self.ports and pkt.sport not in self.ports: + return + + data = bytes(pkt.data or b"") + meta = _parse_quic_long_header(data) + if not meta: + return # not a QUIC long header + + # Render CIDs in hex for readability + dcid_hex = meta.dcid.hex() + scid_hex = meta.scid.hex() + cid_len = f"{len(meta.dcid)}/{len(meta.scid)}" + + # Emit a row (works with table/CSV/JSON reporters) + self.write( + ts=pkt.time, + src=pkt.src, + sport=pkt.sport, + dst=pkt.dst, + dport=pkt.dport, + version=meta.version, + dcid=dcid_hex, + scid=scid_hex, + cid_len=cid_len, + ) + except Exception as e: + # Be defensive; never take down the decode loop + self.debug(f"quic: decode error: {e!r}") + return diff --git a/tests/test_quic_helpers.py b/tests/test_quic_helpers.py new file mode 100644 index 0000000..d170e88 --- /dev/null +++ b/tests/test_quic_helpers.py @@ -0,0 +1,44 @@ +import importlib.util +import pathlib +import sys + +HELPERS = pathlib.Path(__file__).parents[1] / "dshell" / "plugins" / "quic" / "_helpers.py" +spec = importlib.util.spec_from_file_location("dshell_quic_helpers", str(HELPERS)) +mod = importlib.util.module_from_spec(spec) + +# IMPORTANT: register the module before executing so dataclasses/typing can resolve it +sys.modules[spec.name] = mod +spec.loader.exec_module(mod) + +# Pull symbols under test +is_quic_long_header = mod.is_quic_long_header +parse_quic_long_header = mod.parse_quic_long_header +QuicLongHeaderMeta = mod.QuicLongHeaderMeta + + +def test_quic_long_header_parsing_basic(): + first = bytes([0xC3]) # long-header bit set + version = (1).to_bytes(4, "big") + dcid = b"\x01\x02\x03\x04\x05\x06\x07\x08" + scid = b"\xAA\xBB\xCC\xDD" + blob = first + version + bytes([len(dcid)]) + dcid + bytes([len(scid)]) + scid + + assert is_quic_long_header(blob) is True + + meta = parse_quic_long_header(blob) + assert isinstance(meta, QuicLongHeaderMeta) + assert meta.header_form == "long" + assert meta.version == 1 + assert meta.dcid == dcid + assert meta.scid == scid + + +def test_not_quic_or_too_short(): + assert is_quic_long_header(b"\x10\x00") is False + assert parse_quic_long_header(b"\x10\x00") is None + + +def test_malformed_lengths(): + bad = bytes([0x80]) + b"\x00\x00\x00\x01" + bytes([50]) # dcid_len=50, exceeds buffer + assert is_quic_long_header(bad) is True + assert parse_quic_long_header(bad) is None