From 1ab82f994d451cd872de98ff4426ed93c9f5e71c Mon Sep 17 00:00:00 2001 From: Dennis Braun Date: Fri, 24 Apr 2026 23:03:25 +0200 Subject: [PATCH] fix: support IPv6 connection monitor targets --- app/modules/connection_monitor/probe.py | 164 +++-- .../modules/connection_monitor/test_probe.py | 565 +++++++++++++++++- tools/icmp_probe_helper.c | 364 ++++++++--- 3 files changed, 971 insertions(+), 122 deletions(-) diff --git a/app/modules/connection_monitor/probe.py b/app/modules/connection_monitor/probe.py index a46e78da..2430e007 100644 --- a/app/modules/connection_monitor/probe.py +++ b/app/modules/connection_monitor/probe.py @@ -25,6 +25,26 @@ class ProbeResult: method: str # "icmp" or "tcp" +def _resolve_all( + host: str, port: int | None, sock_type: int +) -> list[tuple[int, tuple]]: + """Resolve host with AF_UNSPEC, returning every usable (family, sockaddr). + + Raw sockets have no service, and glibc returns ``EAI_SERVICE`` when + ``port`` is numeric for ``SOCK_RAW``, so raw callers must pass ``None``. + """ + service: int | None = None if sock_type == socket.SOCK_RAW else port + try: + infos = socket.getaddrinfo(host, service, socket.AF_UNSPEC, sock_type) + except socket.gaierror: + return [] + return [ + (info[0], info[4]) + for info in infos + if info[0] in (socket.AF_INET, socket.AF_INET6) + ] + + class ProbeEngine: """Probes targets via ICMP or TCP with auto-detection.""" @@ -81,45 +101,74 @@ def probe(self, host: str, tcp_port: int = 443) -> ProbeResult: return self._tcp_probe(host, tcp_port) def _tcp_probe(self, host: str, port: int) -> ProbeResult: - """Measure TCP handshake latency.""" - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - sock.settimeout(PROBE_TIMEOUT_S) - try: - start = time.monotonic() - result_code = sock.connect_ex((host, port)) - elapsed = (time.monotonic() - start) * 1000 - if result_code == 0: - return ProbeResult( - latency_ms=round(elapsed, 2), timeout=False, method="tcp" - ) + """Measure TCP handshake latency, iterating over resolved addresses.""" + addresses = _resolve_all(host, port, socket.SOCK_STREAM) + if not addresses: return ProbeResult(latency_ms=None, timeout=True, method="tcp") - except (socket.timeout, OSError): - return ProbeResult(latency_ms=None, timeout=True, method="tcp") - finally: - sock.close() + for family, sockaddr in addresses: + try: + sock = socket.socket(family, socket.SOCK_STREAM) + except OSError: + continue + try: + sock.settimeout(PROBE_TIMEOUT_S) + start = time.monotonic() + result_code = sock.connect_ex(sockaddr) + elapsed = (time.monotonic() - start) * 1000 + if result_code == 0: + return ProbeResult( + latency_ms=round(elapsed, 2), + timeout=False, + method="tcp", + ) + except (socket.timeout, OSError): + pass + finally: + sock.close() + return ProbeResult(latency_ms=None, timeout=True, method="tcp") def _icmp_probe(self, host: str) -> ProbeResult: - """Send ICMP echo request and measure round-trip time.""" + """Send ICMP/ICMPv6 echo request and measure round-trip time. + + Iterates every resolved (family, sockaddr) so a dual-stack host is not + short-circuited to a timeout when the first usable family is broken — + e.g. IPv6 socket unsupported, IPv6 route missing, or sendto hitting + ENETUNREACH. Mirrors :meth:`_tcp_probe`. + """ helper_result = self._icmp_probe_with_helper(host) if helper_result is not None: return helper_result - try: - dest = socket.gethostbyname(host) - except socket.gaierror: + addresses = _resolve_all(host, None, socket.SOCK_RAW) + if not addresses: return ProbeResult(latency_ms=None, timeout=True, method="icmp") - sock = socket.socket( - socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP - ) + for family, sockaddr in addresses: + if family == socket.AF_INET6: + result = self._icmp_raw_v6(sockaddr) + else: + result = self._icmp_raw_v4(sockaddr) + if not result.timeout: + return result + return ProbeResult(latency_ms=None, timeout=True, method="icmp") + + def _icmp_raw_v4(self, sockaddr: tuple) -> ProbeResult: + """Raw-socket ICMPv4 echo. Requires CAP_NET_RAW or root.""" + try: + sock = socket.socket( + socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP + ) + except OSError: + return ProbeResult(latency_ms=None, timeout=True, method="icmp") sock.settimeout(PROBE_TIMEOUT_S) try: self._seq = (self._seq + 1) & 0xFFFF + ident = os.getpid() & 0xFFFF packet = self._build_icmp_packet( - seq=self._seq, ident=os.getpid() & 0xFFFF + seq=self._seq, ident=ident, type_=8 ) start = time.monotonic() - sock.sendto(packet, (dest, 0)) + sock.sendto(packet, (sockaddr[0], 0)) while True: remaining = PROBE_TIMEOUT_S - (time.monotonic() - start) if remaining <= 0: @@ -133,11 +182,52 @@ def _icmp_probe(self, host: str) -> ProbeResult: icmp_type, _, _, pkt_id, pkt_seq = struct.unpack( "!BBHHH", icmp_header ) - if ( - icmp_type == 0 - and pkt_id == (os.getpid() & 0xFFFF) - and pkt_seq == self._seq - ): + if icmp_type == 0 and pkt_id == ident and pkt_seq == self._seq: + elapsed = (time.monotonic() - start) * 1000 + return ProbeResult( + latency_ms=round(elapsed, 2), + timeout=False, + method="icmp", + ) + except (socket.timeout, OSError): + return ProbeResult(latency_ms=None, timeout=True, method="icmp") + finally: + sock.close() + + def _icmp_raw_v6(self, sockaddr: tuple) -> ProbeResult: + """Raw-socket ICMPv6 echo. The kernel computes the ICMPv6 checksum.""" + try: + sock = socket.socket( + socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_ICMPV6 + ) + except OSError: + return ProbeResult(latency_ms=None, timeout=True, method="icmp") + sock.settimeout(PROBE_TIMEOUT_S) + try: + self._seq = (self._seq + 1) & 0xFFFF + ident = os.getpid() & 0xFFFF + # ICMPv6 echo request: type=128, code=0. Kernel fills checksum. + packet = self._build_icmp_packet( + seq=self._seq, ident=ident, type_=128 + ) + start = time.monotonic() + sock.sendto(packet, sockaddr) + while True: + remaining = PROBE_TIMEOUT_S - (time.monotonic() - start) + if remaining <= 0: + return ProbeResult( + latency_ms=None, timeout=True, method="icmp" + ) + sock.settimeout(remaining) + data, _ = sock.recvfrom(1024) + # IPv6 raw sockets deliver only the ICMPv6 payload, no IP header + if len(data) < 8: + continue + icmp_type, _, _, pkt_id, pkt_seq = struct.unpack( + "!BBHHH", data[:8] + ) + # echo reply type=129 + if icmp_type == 129 and pkt_id == ident and pkt_seq == self._seq: elapsed = (time.monotonic() - start) * 1000 return ProbeResult( latency_ms=round(elapsed, 2), @@ -205,13 +295,17 @@ def _icmp_probe_with_helper(self, host: str) -> ProbeResult | None: return None @staticmethod - def _build_icmp_packet(seq: int, ident: int) -> bytes: - """Build ICMP echo request packet with checksum.""" - # Type 8 = echo request, code 0 - header = struct.pack("!BBHHH", 8, 0, 0, ident, seq) + def _build_icmp_packet(seq: int, ident: int, type_: int = 8) -> bytes: + """Build an ICMP/ICMPv6 echo request packet. + + ICMPv4 requires a software checksum; the Linux kernel writes the + ICMPv6 checksum on AF_INET6 raw sockets, so a zero checksum is fine. + """ + header = struct.pack("!BBHHH", type_, 0, 0, ident, seq) payload = b"\x00" * 32 - checksum = ProbeEngine._icmp_checksum(header + payload) - header = struct.pack("!BBHHH", 8, 0, checksum, ident, seq) + if type_ == 8: + checksum = ProbeEngine._icmp_checksum(header + payload) + header = struct.pack("!BBHHH", type_, 0, checksum, ident, seq) return header + payload @staticmethod diff --git a/tests/modules/connection_monitor/test_probe.py b/tests/modules/connection_monitor/test_probe.py index 45978f96..d9a7514c 100644 --- a/tests/modules/connection_monitor/test_probe.py +++ b/tests/modules/connection_monitor/test_probe.py @@ -2,12 +2,17 @@ import socket import subprocess +from pathlib import Path from unittest.mock import patch, MagicMock -import pytest from app.modules.connection_monitor.probe import ProbeEngine, ProbeResult +def _gai(family, sockaddr): + """Return a single getaddrinfo tuple for the given family/sockaddr.""" + return [(family, socket.SOCK_STREAM, 0, "", sockaddr)] + + class TestProbeResult: def test_success_result(self): r = ProbeResult(latency_ms=12.5, timeout=False, method="icmp") @@ -174,3 +179,561 @@ def test_icmp_timeout(self): result = engine.probe("1.1.1.1") assert result.timeout is True assert result.latency_ms is None + + +class TestIPv6Support: + """Regression coverage for issue #361 — IPv6 targets must be probable.""" + + def test_tcp_probe_uses_af_inet6_for_ipv6_literal(self): + """An IPv6 literal must produce an AF_INET6 TCP socket.""" + engine = ProbeEngine(method="tcp") + with patch( + "app.modules.connection_monitor.probe.socket.getaddrinfo", + return_value=_gai(socket.AF_INET6, ("2606:4700:4700::1111", 443, 0, 0)), + ), patch("app.modules.connection_monitor.probe.socket.socket") as mock_cls: + mock_instance = MagicMock() + mock_cls.return_value = mock_instance + mock_instance.connect_ex.return_value = 0 + result = engine.probe("2606:4700:4700::1111", tcp_port=443) + assert mock_cls.call_args.args[0] == socket.AF_INET6 + assert mock_cls.call_args.args[1] == socket.SOCK_STREAM + sockaddr = mock_instance.connect_ex.call_args.args[0] + assert sockaddr[0] == "2606:4700:4700::1111" + assert sockaddr[1] == 443 + assert result.timeout is False + assert result.method == "tcp" + + def test_tcp_probe_uses_af_inet_for_ipv4_literal(self): + """IPv4 literals keep using AF_INET.""" + engine = ProbeEngine(method="tcp") + with patch( + "app.modules.connection_monitor.probe.socket.getaddrinfo", + return_value=_gai(socket.AF_INET, ("1.1.1.1", 443)), + ), patch("app.modules.connection_monitor.probe.socket.socket") as mock_cls: + mock_instance = MagicMock() + mock_cls.return_value = mock_instance + mock_instance.connect_ex.return_value = 0 + engine.probe("1.1.1.1", tcp_port=443) + assert mock_cls.call_args.args[0] == socket.AF_INET + sockaddr = mock_instance.connect_ex.call_args.args[0] + assert sockaddr == ("1.1.1.1", 443) + + def test_tcp_probe_resolves_aaaa_only_hostname_to_ipv6(self): + """A hostname returning only AAAA records must use AF_INET6.""" + engine = ProbeEngine(method="tcp") + with patch( + "app.modules.connection_monitor.probe.socket.getaddrinfo", + return_value=_gai(socket.AF_INET6, ("2001:db8::1", 443, 0, 0)), + ), patch("app.modules.connection_monitor.probe.socket.socket") as mock_cls: + mock_instance = MagicMock() + mock_cls.return_value = mock_instance + mock_instance.connect_ex.return_value = 0 + engine.probe("v6only.example.test", tcp_port=443) + assert mock_cls.call_args.args[0] == socket.AF_INET6 + sockaddr = mock_instance.connect_ex.call_args.args[0] + assert sockaddr[0] == "2001:db8::1" + + def test_tcp_probe_unresolvable_host_returns_clean_timeout(self): + """A getaddrinfo failure must produce a timeout result, not raise.""" + engine = ProbeEngine(method="tcp") + with patch( + "app.modules.connection_monitor.probe.socket.getaddrinfo", + side_effect=socket.gaierror(socket.EAI_NONAME, "nodename nor servname provided"), + ): + result = engine.probe("definitely-not-resolvable.invalid", tcp_port=443) + assert result.timeout is True + assert result.latency_ms is None + assert result.method == "tcp" + + def test_icmp_helper_passes_ipv6_literal_unchanged(self): + """The helper subprocess must receive the IPv6 literal verbatim.""" + helper_check = subprocess.CompletedProcess( + args=["docsight-icmp-helper", "--check"], + returncode=0, + stdout="ok\n", + stderr="", + ) + helper_run = subprocess.CompletedProcess( + args=["docsight-icmp-helper", "2606:4700:4700::1111", "2000"], + returncode=0, + stdout="7.50\n", + stderr="", + ) + with patch( + "app.modules.connection_monitor.probe.os.path.isfile", return_value=True + ), patch( + "app.modules.connection_monitor.probe.os.access", return_value=True + ), patch( + "app.modules.connection_monitor.probe.subprocess.run", + side_effect=[helper_check, helper_run], + ) as mock_run: + engine = ProbeEngine(method="auto") + result = engine.probe("2606:4700:4700::1111") + second_call = mock_run.call_args_list[1] + invoked_args = second_call.args[0] + assert invoked_args[1] == "2606:4700:4700::1111" + assert result.timeout is False + assert result.latency_ms == 7.5 + assert result.method == "icmp" + + def test_icmp_raw_fallback_uses_icmpv6_for_ipv6_target(self): + """When the helper is missing, ICMPv6 must be attempted for IPv6 hosts.""" + engine = ProbeEngine(method="icmp") + + opened: list[tuple] = [] + + def fake_socket(family, type_, proto): + opened.append((family, type_, proto)) + return MagicMock( + sendto=MagicMock(), + recvfrom=MagicMock(side_effect=socket.timeout), + close=MagicMock(), + settimeout=MagicMock(), + ) + + with patch( + "app.modules.connection_monitor.probe.socket.getaddrinfo", + return_value=[( + socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_ICMPV6, "", + ("2606:4700:4700::1111", 0, 0, 0), + )], + ), patch( + "app.modules.connection_monitor.probe.socket.socket", + side_effect=fake_socket, + ): + result = engine.probe("2606:4700:4700::1111") + assert any( + entry[0] == socket.AF_INET6 and entry[2] == socket.IPPROTO_ICMPV6 + for entry in opened + ), f"expected ICMPv6 raw socket, opened={opened}" + assert result.method == "icmp" + assert result.timeout is True + + def test_icmp_unresolvable_host_returns_clean_timeout(self): + """A DNS failure on the ICMP path must not raise.""" + engine = ProbeEngine(method="icmp") + with patch( + "app.modules.connection_monitor.probe.socket.getaddrinfo", + side_effect=socket.gaierror(socket.EAI_NONAME, "nodename nor servname provided"), + ): + result = engine.probe("definitely-not-resolvable.invalid") + assert result.timeout is True + assert result.latency_ms is None + assert result.method == "icmp" + + +class TestIPv6Regressions: + """Regression coverage for the Codex review blockers on the #361 fix.""" + + def test_icmp_raw_fallback_resolves_ipv4_literal_without_eai_service(self): + """Raw ICMP must resolve IPv4 literals without tripping EAI_SERVICE. + + Linux/glibc returns EAI_SERVICE when ``getaddrinfo`` is called with + ``port=0`` and ``SOCK_RAW`` — so the partial fix always fell straight to + timeout before opening any raw socket, regressing IPv4 too. + """ + opened: list[tuple] = [] + + def fake_socket(family, type_, proto): + opened.append((family, type_, proto)) + m = MagicMock() + m.recvfrom.side_effect = socket.timeout + return m + + with patch( + "app.modules.connection_monitor.probe.os.path.isfile", return_value=False + ), patch( + "app.modules.connection_monitor.probe.socket.socket", + side_effect=fake_socket, + ): + engine = ProbeEngine(method="icmp") + engine.probe("1.1.1.1") + + assert any( + fam == socket.AF_INET + and type_ == socket.SOCK_RAW + and proto == socket.IPPROTO_ICMP + for fam, type_, proto in opened + ), f"expected AF_INET/IPPROTO_ICMP raw socket; opened={opened}" + + def test_icmp_raw_fallback_resolves_ipv6_literal_without_eai_service(self): + """Raw ICMPv6 must be reachable for an IPv6 literal via real getaddrinfo.""" + opened: list[tuple] = [] + + def fake_socket(family, type_, proto): + opened.append((family, type_, proto)) + m = MagicMock() + m.recvfrom.side_effect = socket.timeout + return m + + with patch( + "app.modules.connection_monitor.probe.os.path.isfile", return_value=False + ), patch( + "app.modules.connection_monitor.probe.socket.socket", + side_effect=fake_socket, + ): + engine = ProbeEngine(method="icmp") + engine.probe("2606:4700:4700::1111") + + assert any( + fam == socket.AF_INET6 + and type_ == socket.SOCK_RAW + and proto == socket.IPPROTO_ICMPV6 + for fam, type_, proto in opened + ), f"expected AF_INET6/IPPROTO_ICMPV6 raw socket; opened={opened}" + + def test_tcp_probe_iterates_when_first_family_socket_creation_fails(self): + """TCP must try the next resolved address if the first socket fails.""" + engine = ProbeEngine(method="tcp") + infos = [ + (socket.AF_INET6, socket.SOCK_STREAM, 6, "", ("2001:db8::1", 443, 0, 0)), + (socket.AF_INET, socket.SOCK_STREAM, 6, "", ("198.51.100.1", 443)), + ] + created: list[tuple[int, int]] = [] + + def fake_socket(family, type_, *args, **kwargs): + created.append((family, type_)) + if family == socket.AF_INET6: + raise OSError(97, "Address family not supported by protocol") + m = MagicMock() + m.connect_ex.return_value = 0 + return m + + with patch( + "app.modules.connection_monitor.probe.socket.getaddrinfo", + return_value=infos, + ), patch( + "app.modules.connection_monitor.probe.socket.socket", + side_effect=fake_socket, + ): + result = engine.probe("dualstack.example.test", tcp_port=443) + + assert result.timeout is False + assert result.method == "tcp" + families = [fam for fam, _ in created] + assert socket.AF_INET6 in families + assert socket.AF_INET in families + + def test_tcp_probe_iterates_when_first_family_connect_fails(self): + """TCP must try the next resolved address if the first connect_ex fails.""" + engine = ProbeEngine(method="tcp") + infos = [ + (socket.AF_INET6, socket.SOCK_STREAM, 6, "", ("2001:db8::1", 443, 0, 0)), + (socket.AF_INET, socket.SOCK_STREAM, 6, "", ("198.51.100.1", 443)), + ] + attempts: list[tuple[int, tuple]] = [] + + def fake_socket(family, type_, *args, **kwargs): + m = MagicMock() + + def connect_ex(sockaddr): + attempts.append((family, sockaddr)) + return 0 if family == socket.AF_INET else 111 # ECONNREFUSED on v6 + + m.connect_ex.side_effect = connect_ex + return m + + with patch( + "app.modules.connection_monitor.probe.socket.getaddrinfo", + return_value=infos, + ), patch( + "app.modules.connection_monitor.probe.socket.socket", + side_effect=fake_socket, + ): + result = engine.probe("dualstack.example.test", tcp_port=443) + + assert result.timeout is False + assert result.method == "tcp" + assert len(attempts) == 2 + assert attempts[0][0] == socket.AF_INET6 + assert attempts[1][0] == socket.AF_INET + + def test_tcp_probe_socket_oserror_does_not_escape(self): + """Socket creation failure must return a clean timeout, never raise. + + Previously ``socket.socket`` sat outside the try/except, so an + unsupported first family escaped into the collector as + ``probe_method="error"``. + """ + engine = ProbeEngine(method="tcp") + infos = [ + (socket.AF_INET6, socket.SOCK_STREAM, 6, "", ("2001:db8::1", 443, 0, 0)), + ] + with patch( + "app.modules.connection_monitor.probe.socket.getaddrinfo", + return_value=infos, + ), patch( + "app.modules.connection_monitor.probe.socket.socket", + side_effect=OSError(97, "Address family not supported by protocol"), + ): + result = engine.probe("v6only.example.test", tcp_port=443) + + assert result.timeout is True + assert result.latency_ms is None + assert result.method == "tcp" + + def test_icmp_helper_c_source_handles_ipv6(self): + """The setuid helper source must handle AF_INET6/ICMPv6 (not IPv4-only).""" + helper_src = ( + Path(__file__).resolve().parents[3] / "tools" / "icmp_probe_helper.c" + ) + src = helper_src.read_text() + assert "AF_UNSPEC" in src, "helper must resolve with AF_UNSPEC, not AF_INET" + assert "AF_INET6" in src, "helper must open AF_INET6 sockets" + assert "IPPROTO_ICMPV6" in src, "helper must speak IPPROTO_ICMPV6" + assert ( + "ICMP6_ECHO_REQUEST" in src or "128" in src + ), "helper must send ICMPv6 echo request (type 128)" + assert ( + "ICMP6_ECHO_REPLY" in src or "129" in src + ), "helper must parse ICMPv6 echo reply (type 129)" + + +class TestDualStackFallback: + """Codex gate 2: ICMP fallback must iterate addrinfo like TCP does. + + The partial fix resolved every (family, sockaddr) but then probed only + ``addresses[0]``. A dual-stack hostname whose first usable family was + IPv6 — e.g. IPv6 socket unsupported, sendto hits ENETUNREACH, or the + echo never replies — would time out instead of falling back to IPv4. + The C helper had the symmetrical bug in ``resolve_any``. + """ + + def test_icmp_raw_fallback_tries_ipv4_when_ipv6_socket_creation_fails(self): + """IPv6 raw socket creation OSError must not short-circuit IPv4.""" + infos = [ + ( + socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_ICMPV6, "", + ("2001:db8::1", 0, 0, 0), + ), + ( + socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP, "", + ("198.51.100.1", 0), + ), + ] + opened: list[tuple[int, int, int]] = [] + + def fake_socket(family, type_, proto): + opened.append((family, type_, proto)) + if family == socket.AF_INET6: + raise OSError(97, "Address family not supported by protocol") + m = MagicMock() + m.recvfrom.side_effect = socket.timeout + return m + + with patch( + "app.modules.connection_monitor.probe.os.path.isfile", return_value=False + ), patch( + "app.modules.connection_monitor.probe.socket.getaddrinfo", + return_value=infos, + ), patch( + "app.modules.connection_monitor.probe.socket.socket", + side_effect=fake_socket, + ): + engine = ProbeEngine(method="icmp") + result = engine.probe("dualstack.example.test") + + families = [fam for fam, _, _ in opened] + assert socket.AF_INET6 in families, ( + f"expected IPv6 attempt first; opened={opened}" + ) + assert socket.AF_INET in families, ( + f"expected IPv4 fallback after IPv6 socket error; opened={opened}" + ) + assert result.method == "icmp" + + def test_icmp_raw_fallback_tries_ipv4_when_ipv6_sendto_fails(self): + """ENETUNREACH on the IPv6 address must not short-circuit IPv4.""" + infos = [ + ( + socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_ICMPV6, "", + ("2001:db8::1", 0, 0, 0), + ), + ( + socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP, "", + ("198.51.100.1", 0), + ), + ] + opened: list[tuple[int, int, int]] = [] + + def fake_socket(family, type_, proto): + opened.append((family, type_, proto)) + m = MagicMock() + if family == socket.AF_INET6: + m.sendto.side_effect = OSError(101, "Network is unreachable") + else: + m.recvfrom.side_effect = socket.timeout + return m + + with patch( + "app.modules.connection_monitor.probe.os.path.isfile", return_value=False + ), patch( + "app.modules.connection_monitor.probe.socket.getaddrinfo", + return_value=infos, + ), patch( + "app.modules.connection_monitor.probe.socket.socket", + side_effect=fake_socket, + ): + engine = ProbeEngine(method="icmp") + result = engine.probe("dualstack.example.test") + + families = [fam for fam, _, _ in opened] + assert socket.AF_INET6 in families + assert socket.AF_INET in families, ( + f"expected IPv4 fallback after IPv6 sendto error; opened={opened}" + ) + assert result.method == "icmp" + + def test_icmp_raw_fallback_returns_ipv4_success_after_ipv6_timeout(self): + """When IPv6 times out with no reply, iterate to IPv4 and use its answer.""" + infos = [ + ( + socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_ICMPV6, "", + ("2001:db8::1", 0, 0, 0), + ), + ( + socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP, "", + ("198.51.100.1", 0), + ), + ] + attempts: list[tuple[str, tuple]] = [] + + def fake_v6(sockaddr): + attempts.append(("v6", sockaddr)) + return ProbeResult(latency_ms=None, timeout=True, method="icmp") + + def fake_v4(sockaddr): + attempts.append(("v4", sockaddr)) + return ProbeResult(latency_ms=4.2, timeout=False, method="icmp") + + with patch( + "app.modules.connection_monitor.probe.os.path.isfile", return_value=False + ), patch( + "app.modules.connection_monitor.probe.socket.getaddrinfo", + return_value=infos, + ): + engine = ProbeEngine(method="icmp") + with patch.object(engine, "_icmp_raw_v6", side_effect=fake_v6), \ + patch.object(engine, "_icmp_raw_v4", side_effect=fake_v4): + result = engine.probe("dualstack.example.test") + + assert [a[0] for a in attempts] == ["v6", "v4"], ( + f"must iterate v6 then v4; got {attempts}" + ) + assert result.timeout is False + assert result.method == "icmp" + assert result.latency_ms == 4.2 + + def test_icmp_raw_fallback_all_addresses_failing_returns_timeout(self): + """Every address failing still produces a timeout ProbeResult (method=icmp).""" + infos = [ + ( + socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_ICMPV6, "", + ("2001:db8::1", 0, 0, 0), + ), + ( + socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP, "", + ("198.51.100.1", 0), + ), + ] + + def fake_socket(*_args, **_kwargs): + m = MagicMock() + m.recvfrom.side_effect = socket.timeout + return m + + with patch( + "app.modules.connection_monitor.probe.os.path.isfile", return_value=False + ), patch( + "app.modules.connection_monitor.probe.socket.getaddrinfo", + return_value=infos, + ), patch( + "app.modules.connection_monitor.probe.socket.socket", + side_effect=fake_socket, + ): + engine = ProbeEngine(method="icmp") + result = engine.probe("dualstack.example.test") + + assert result.timeout is True + assert result.latency_ms is None + assert result.method == "icmp" + + def test_icmp_helper_c_source_reserves_bounded_per_address_budget(self, tmp_path): + """Helper must bound each address's wait so a silent first address does + not starve later addresses of time. + + Compiles the helper and invokes a ``--plan `` + diagnostic mode that simulates the main loop's budget allocation + without needing raw-socket privileges. With N addresses and total T, + every address must receive a non-zero budget strictly less than T, + and the sum must not exceed T — otherwise a first address that sends + but never replies can consume the whole window and the second address + is never attempted. + """ + helper_src = ( + Path(__file__).resolve().parents[3] / "tools" / "icmp_probe_helper.c" + ) + binary = tmp_path / "icmp_probe_helper" + compile_result = subprocess.run( + ["gcc", "-O2", "-Wall", "-Werror", "-o", str(binary), str(helper_src)], + capture_output=True, + text=True, + ) + assert compile_result.returncode == 0, compile_result.stderr + + for total_ms, count in [(2000, 2), (3000, 3), (2000, 4)]: + plan = subprocess.run( + [str(binary), "--plan", str(total_ms), str(count)], + capture_output=True, + text=True, + timeout=5, + ) + assert plan.returncode == 0, plan.stderr + budgets = [int(x) for x in plan.stdout.split()] + assert len(budgets) == count, ( + f"--plan {total_ms} {count} must emit {count} budgets; got {budgets}" + ) + for i, budget in enumerate(budgets): + assert budget > 0, ( + f"address {i} of {count} got zero budget with total={total_ms}; " + f"a silent first address would starve later ones. budgets={budgets}" + ) + assert budget < total_ms, ( + f"address {i} budget must be strictly bounded below total; " + f"got {budget} with total={total_ms}. budgets={budgets}" + ) + assert sum(budgets) <= total_ms, ( + f"sum of per-address budgets must not exceed total={total_ms}; " + f"got sum={sum(budgets)} budgets={budgets}" + ) + + def test_icmp_helper_c_source_iterates_addrinfo_while_live(self): + """Helper must probe each address before freeing the addrinfo list. + + Pre-fix, ``resolve_any`` picked the first usable entry, freed the + list, and main sent exactly one packet to that address. After the + fix, ``sendto`` must run while the addrinfo list is still live — + i.e. at least one ``freeaddrinfo`` call appears AFTER ``sendto`` + in source order. That is the textual signature of a proper loop. + """ + import re + + helper_src = ( + Path(__file__).resolve().parents[3] / "tools" / "icmp_probe_helper.c" + ) + src = helper_src.read_text() + + getaddrinfo_calls = [m.start() for m in re.finditer(r"\bgetaddrinfo\s*\(", src)] + freeaddrinfo_calls = [m.start() for m in re.finditer(r"\bfreeaddrinfo\s*\(", src)] + sendto_calls = [m.start() for m in re.finditer(r"\bsendto\s*\(", src)] + + assert getaddrinfo_calls, "helper must still call getaddrinfo" + assert sendto_calls, "helper must still call sendto" + assert freeaddrinfo_calls, "helper must still call freeaddrinfo" + + earliest_sendto = min(sendto_calls) + assert any(pos > earliest_sendto for pos in freeaddrinfo_calls), ( + "freeaddrinfo must run AFTER sendto in source order; otherwise the " + "helper is picking one address, freeing the list, then probing — " + "which cannot iterate across address families." + ) diff --git a/tools/icmp_probe_helper.c b/tools/icmp_probe_helper.c index 855cf8cb..8b4f96e8 100644 --- a/tools/icmp_probe_helper.c +++ b/tools/icmp_probe_helper.c @@ -1,6 +1,8 @@ #include #include #include +#include +#include #include #include #include @@ -30,117 +32,177 @@ static unsigned short icmp_checksum(const void *buf, int len) { return (unsigned short)(~sum); } -static int open_icmp_socket(void) { +static int open_icmp_socket(int family) { + if (family == AF_INET6) { + return socket(AF_INET6, SOCK_RAW, IPPROTO_ICMPV6); + } return socket(AF_INET, SOCK_RAW, IPPROTO_ICMP); } -static int resolve_ipv4(const char *host, struct sockaddr_in *addr) { - struct addrinfo hints; - struct addrinfo *result = NULL; - int rc; - - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_RAW; - hints.ai_protocol = IPPROTO_ICMP; - - rc = getaddrinfo(host, NULL, &hints, &result); - if (rc != 0) { - fprintf(stderr, "%s\n", gai_strerror(rc)); +static long elapsed_ms_since(const struct timeval *start) { + struct timeval now; + if (gettimeofday(&now, NULL) != 0) { return -1; } - - memcpy(addr, result->ai_addr, sizeof(*addr)); - freeaddrinfo(result); - return 0; + return (now.tv_sec - start->tv_sec) * 1000L + + (now.tv_usec - start->tv_usec) / 1000L; } -int main(int argc, char **argv) { - if (argc == 2 && strcmp(argv[1], "--check") == 0) { - int sock = open_icmp_socket(); - if (sock < 0) { - perror("socket"); - return 2; - } - close(sock); - puts("ok"); +/* Split the remaining overall budget evenly across the remaining addresses + * so a silent first address cannot starve later addresses of their wait. */ +static long per_address_budget_ms(long remaining_overall_ms, + int remaining_addresses) { + if (remaining_addresses <= 0 || remaining_overall_ms <= 0) { return 0; } + return remaining_overall_ms / remaining_addresses; +} - if (argc < 2 || argc > 3) { - fprintf(stderr, "usage: %s [--check] [timeout_ms]\n", argv[0]); - return 2; +static int build_icmp4_echo(unsigned char *buf, unsigned short ident, + unsigned short seq) { + memset(buf, 0, sizeof(struct icmphdr) + PAYLOAD_SIZE); + struct icmphdr *hdr = (struct icmphdr *)buf; + hdr->type = ICMP_ECHO; + hdr->code = 0; + hdr->un.echo.id = htons(ident); + hdr->un.echo.sequence = htons(seq); + hdr->checksum = icmp_checksum(buf, sizeof(struct icmphdr) + PAYLOAD_SIZE); + return (int)(sizeof(struct icmphdr) + PAYLOAD_SIZE); +} + +static int build_icmp6_echo(unsigned char *buf, unsigned short ident, + unsigned short seq) { + memset(buf, 0, sizeof(struct icmp6_hdr) + PAYLOAD_SIZE); + struct icmp6_hdr *hdr = (struct icmp6_hdr *)buf; + hdr->icmp6_type = ICMP6_ECHO_REQUEST; + hdr->icmp6_code = 0; + hdr->icmp6_id = htons(ident); + hdr->icmp6_seq = htons(seq); + /* Kernel fills ICMPv6 checksum for IPPROTO_ICMPV6 raw sockets. */ + hdr->icmp6_cksum = 0; + return (int)(sizeof(struct icmp6_hdr) + PAYLOAD_SIZE); +} + +static int match_icmp4_reply(const unsigned char *buf, ssize_t len, + unsigned short ident, unsigned short seq) { + /* AF_INET raw sockets receive the full IP datagram; skip the IP header. */ + if (len < (ssize_t)(sizeof(struct ip) + sizeof(struct icmphdr))) { + return 0; + } + const struct ip *ip_hdr = (const struct ip *)buf; + size_t ip_hlen = (size_t)ip_hdr->ip_hl * 4; + if ((size_t)len < ip_hlen + sizeof(struct icmphdr)) { + return 0; } + const struct icmphdr *reply = (const struct icmphdr *)(buf + ip_hlen); + return reply->type == ICMP_ECHOREPLY + && reply->un.echo.id == htons(ident) + && reply->un.echo.sequence == htons(seq); +} - const char *host = argv[1]; - int timeout_ms = DEFAULT_TIMEOUT_MS; - if (argc == 3) { - timeout_ms = atoi(argv[2]); - if (timeout_ms <= 0) { - fprintf(stderr, "invalid timeout_ms\n"); - return 2; - } +static int match_icmp6_reply(const unsigned char *buf, ssize_t len, + unsigned short ident, unsigned short seq) { + /* IPPROTO_ICMPV6 raw sockets deliver only the ICMPv6 message. */ + if (len < (ssize_t)sizeof(struct icmp6_hdr)) { + return 0; } + const struct icmp6_hdr *reply = (const struct icmp6_hdr *)buf; + return reply->icmp6_type == ICMP6_ECHO_REPLY + && reply->icmp6_id == htons(ident) + && reply->icmp6_seq == htons(seq); +} - int sock = open_icmp_socket(); - if (sock < 0) { +static int run_check(void) { + int sock4 = open_icmp_socket(AF_INET); + int sock6 = open_icmp_socket(AF_INET6); + if (sock4 < 0 && sock6 < 0) { perror("socket"); return 2; } + if (sock4 >= 0) { + close(sock4); + } + if (sock6 >= 0) { + close(sock6); + } + puts("ok"); + return 0; +} - struct sockaddr_in dest; - memset(&dest, 0, sizeof(dest)); - if (resolve_ipv4(host, &dest) != 0) { - close(sock); - return 2; +/* Attempt to probe one resolved address. Returns 0 on reply, + * 1 on sent-but-no-reply, -1 when this address could not be used + * (e.g. socket/sendto failed). latency_out is filled on success. + * + * The wait is bounded by min(per_address_budget_ms, total_budget_ms - elapsed): + * a silent first address must not consume the full overall budget, or later + * addresses in the addrinfo list would never be probed. */ +static int probe_address(const struct addrinfo *ai, + unsigned short ident, + unsigned short seq, + long attempt_budget_ms, + long total_budget_ms, + const struct timeval *overall_start, + double *latency_out) { + int sock = open_icmp_socket(ai->ai_family); + if (sock < 0) { + perror("socket"); + return -1; } - unsigned char packet[sizeof(struct icmphdr) + PAYLOAD_SIZE]; - memset(packet, 0, sizeof(packet)); - struct icmphdr *hdr = (struct icmphdr *)packet; - hdr->type = ICMP_ECHO; - hdr->code = 0; - hdr->un.echo.id = htons((unsigned short)(getpid() & 0xFFFF)); - hdr->un.echo.sequence = htons(1); - hdr->checksum = icmp_checksum(packet, sizeof(packet)); + unsigned char packet[sizeof(struct icmp6_hdr) + PAYLOAD_SIZE]; + int packet_len = (ai->ai_family == AF_INET6) + ? build_icmp6_echo(packet, ident, seq) + : build_icmp4_echo(packet, ident, seq); - struct timeval start; - if (gettimeofday(&start, NULL) != 0) { + struct timeval attempt_start; + if (gettimeofday(&attempt_start, NULL) != 0) { perror("gettimeofday"); close(sock); - return 2; + return -1; } - if (sendto(sock, packet, sizeof(packet), 0, (struct sockaddr *)&dest, sizeof(dest)) < 0) { + if (sendto(sock, packet, (size_t)packet_len, 0, + ai->ai_addr, ai->ai_addrlen) < 0) { perror("sendto"); close(sock); - return 2; + return -1; } for (;;) { - struct timeval now; - if (gettimeofday(&now, NULL) != 0) { + long elapsed_overall = elapsed_ms_since(overall_start); + if (elapsed_overall < 0) { perror("gettimeofday"); close(sock); - return 2; + return 1; + } + long remaining_overall = total_budget_ms - elapsed_overall; + if (remaining_overall <= 0) { + close(sock); + return 1; } - long elapsed_ms = (now.tv_sec - start.tv_sec) * 1000L - + (now.tv_usec - start.tv_usec) / 1000L; - long remaining_ms = timeout_ms - elapsed_ms; - if (remaining_ms <= 0) { - puts("TIMEOUT"); + long elapsed_attempt = elapsed_ms_since(&attempt_start); + if (elapsed_attempt < 0) { + perror("gettimeofday"); close(sock); return 1; } + long remaining_attempt = attempt_budget_ms - elapsed_attempt; + if (remaining_attempt <= 0) { + close(sock); + return 1; + } + + long wait_ms = remaining_overall < remaining_attempt + ? remaining_overall + : remaining_attempt; fd_set readfds; FD_ZERO(&readfds); FD_SET(sock, &readfds); - struct timeval tv; - tv.tv_sec = remaining_ms / 1000L; - tv.tv_usec = (remaining_ms % 1000L) * 1000L; + tv.tv_sec = wait_ms / 1000L; + tv.tv_usec = (wait_ms % 1000L) * 1000L; int ready = select(sock + 1, &readfds, NULL, NULL, &tv); if (ready < 0) { @@ -149,15 +211,14 @@ int main(int argc, char **argv) { } perror("select"); close(sock); - return 2; + return 1; } if (ready == 0) { - puts("TIMEOUT"); close(sock); return 1; } - unsigned char buffer[1024]; + unsigned char buffer[2048]; ssize_t received = recvfrom(sock, buffer, sizeof(buffer), 0, NULL, NULL); if (received < 0) { if (errno == EINTR) { @@ -165,28 +226,159 @@ int main(int argc, char **argv) { } perror("recvfrom"); close(sock); + return 1; + } + + int matches = (ai->ai_family == AF_INET6) + ? match_icmp6_reply(buffer, received, ident, seq) + : match_icmp4_reply(buffer, received, ident, seq); + if (!matches) { + continue; + } + + struct timeval end; + if (gettimeofday(&end, NULL) != 0) { + perror("gettimeofday"); + close(sock); + return 1; + } + *latency_out = (double)(end.tv_sec - attempt_start.tv_sec) * 1000.0 + + (double)(end.tv_usec - attempt_start.tv_usec) / 1000.0; + close(sock); + return 0; + } +} + +/* Diagnostic mode: simulate how the main loop would allocate per-address + * budgets for num_addresses, assuming each attempt consumes its full budget. + * Prints one budget per line. Used by tests to prove that no address is + * starved — without this, the Python test would have to run the real helper + * with raw-socket privileges against an unreachable host. */ +static int run_plan(long total_ms, int num_addresses) { + if (total_ms <= 0 || num_addresses <= 0) { + fprintf(stderr, "invalid plan args\n"); + return 2; + } + long simulated_elapsed = 0; + int remaining = num_addresses; + for (int i = 0; i < num_addresses; i++) { + long remaining_overall = total_ms - simulated_elapsed; + long budget = per_address_budget_ms(remaining_overall, remaining); + printf("%ld\n", budget); + simulated_elapsed += budget; + remaining--; + } + return 0; +} + +int main(int argc, char **argv) { + if (argc == 2 && strcmp(argv[1], "--check") == 0) { + return run_check(); + } + + if (argc == 4 && strcmp(argv[1], "--plan") == 0) { + return run_plan(atol(argv[2]), atoi(argv[3])); + } + + if (argc < 2 || argc > 3) { + fprintf(stderr, "usage: %s [--check] [timeout_ms]\n", argv[0]); + return 2; + } + + const char *host = argv[1]; + int timeout_ms = DEFAULT_TIMEOUT_MS; + if (argc == 3) { + timeout_ms = atoi(argv[2]); + if (timeout_ms <= 0) { + fprintf(stderr, "invalid timeout_ms\n"); return 2; } + } + + struct addrinfo hints; + struct addrinfo *result = NULL; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_RAW; + /* No service: raw sockets have no port, and Linux returns EAI_SERVICE + * if a numeric service is supplied with SOCK_RAW. */ + int rc = getaddrinfo(host, NULL, &hints, &result); + if (rc != 0) { + fprintf(stderr, "%s\n", gai_strerror(rc)); + return 2; + } - if (received < (ssize_t)(20 + sizeof(struct icmphdr))) { + struct timeval overall_start; + if (gettimeofday(&overall_start, NULL) != 0) { + perror("gettimeofday"); + freeaddrinfo(result); + return 2; + } + + unsigned short ident = (unsigned short)(getpid() & 0xFFFF); + unsigned short seq = 0; + int any_sent = 0; + int any_usable_family = 0; + + /* Pre-count usable addresses so per-address budgets can be allocated + * without giving the first address the entire overall window. */ + int remaining_addresses = 0; + for (struct addrinfo *ai = result; ai != NULL; ai = ai->ai_next) { + if (ai->ai_family == AF_INET || ai->ai_family == AF_INET6) { + remaining_addresses++; + } + } + + /* Iterate every resolved address, trying each family until one replies. + * Mirrors the Python TCP probe semantics: socket/sendto/timeout on the + * first address must not short-circuit the remaining addresses. */ + for (struct addrinfo *ai = result; ai != NULL; ai = ai->ai_next) { + if (ai->ai_family != AF_INET && ai->ai_family != AF_INET6) { continue; } + any_usable_family = 1; - struct icmphdr *reply = (struct icmphdr *)(buffer + 20); - if (reply->type == ICMP_ECHOREPLY - && reply->un.echo.id == hdr->un.echo.id - && reply->un.echo.sequence == hdr->un.echo.sequence) { - struct timeval end; - if (gettimeofday(&end, NULL) != 0) { - perror("gettimeofday"); - close(sock); - return 2; - } - double latency_ms = (double)(end.tv_sec - start.tv_sec) * 1000.0 - + (double)(end.tv_usec - start.tv_usec) / 1000.0; + long elapsed = elapsed_ms_since(&overall_start); + if (elapsed < 0) { + perror("gettimeofday"); + freeaddrinfo(result); + return 2; + } + long remaining_overall = timeout_ms - elapsed; + if (remaining_overall <= 0) { + break; + } + + long per_addr_budget = per_address_budget_ms( + remaining_overall, remaining_addresses); + remaining_addresses--; + if (per_addr_budget <= 0) { + continue; + } + + seq++; + double latency_ms = 0.0; + int probe_rc = probe_address(ai, ident, seq, per_addr_budget, + timeout_ms, &overall_start, &latency_ms); + if (probe_rc == 0) { printf("%.2f\n", latency_ms); - close(sock); + freeaddrinfo(result); return 0; } + if (probe_rc == 1) { + any_sent = 1; + } + /* probe_rc == -1: socket/sendto failure, try next address. */ + } + + freeaddrinfo(result); + + if (any_sent) { + puts("TIMEOUT"); + return 1; + } + if (!any_usable_family) { + fprintf(stderr, "no usable address\n"); } + return 2; }