Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 129 additions & 35 deletions app/modules/connection_monitor/probe.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,26 @@ class ProbeResult:
method: str # "icmp" or "tcp"


def _resolve_all(
host: str, port: int | None, sock_type: int
) -> list[tuple[int, tuple]]:
"""Resolve host with AF_UNSPEC, returning every usable (family, sockaddr).

Raw sockets have no service, and glibc returns ``EAI_SERVICE`` when
``port`` is numeric for ``SOCK_RAW``, so raw callers must pass ``None``.
"""
service: int | None = None if sock_type == socket.SOCK_RAW else port
try:
infos = socket.getaddrinfo(host, service, socket.AF_UNSPEC, sock_type)
except socket.gaierror:
return []
return [
(info[0], info[4])
for info in infos
if info[0] in (socket.AF_INET, socket.AF_INET6)
]


class ProbeEngine:
"""Probes targets via ICMP or TCP with auto-detection."""

Expand Down Expand Up @@ -81,45 +101,74 @@ def probe(self, host: str, tcp_port: int = 443) -> ProbeResult:
return self._tcp_probe(host, tcp_port)

def _tcp_probe(self, host: str, port: int) -> ProbeResult:
"""Measure TCP handshake latency."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(PROBE_TIMEOUT_S)
try:
start = time.monotonic()
result_code = sock.connect_ex((host, port))
elapsed = (time.monotonic() - start) * 1000
if result_code == 0:
return ProbeResult(
latency_ms=round(elapsed, 2), timeout=False, method="tcp"
)
"""Measure TCP handshake latency, iterating over resolved addresses."""
addresses = _resolve_all(host, port, socket.SOCK_STREAM)
if not addresses:
return ProbeResult(latency_ms=None, timeout=True, method="tcp")
except (socket.timeout, OSError):
return ProbeResult(latency_ms=None, timeout=True, method="tcp")
finally:
sock.close()
for family, sockaddr in addresses:
try:
sock = socket.socket(family, socket.SOCK_STREAM)
except OSError:
continue
try:
sock.settimeout(PROBE_TIMEOUT_S)
start = time.monotonic()
result_code = sock.connect_ex(sockaddr)
elapsed = (time.monotonic() - start) * 1000
if result_code == 0:
return ProbeResult(
latency_ms=round(elapsed, 2),
timeout=False,
method="tcp",
)
except (socket.timeout, OSError):
pass
finally:
sock.close()
return ProbeResult(latency_ms=None, timeout=True, method="tcp")

def _icmp_probe(self, host: str) -> ProbeResult:
"""Send ICMP echo request and measure round-trip time."""
"""Send ICMP/ICMPv6 echo request and measure round-trip time.

Iterates every resolved (family, sockaddr) so a dual-stack host is not
short-circuited to a timeout when the first usable family is broken —
e.g. IPv6 socket unsupported, IPv6 route missing, or sendto hitting
ENETUNREACH. Mirrors :meth:`_tcp_probe`.
"""
helper_result = self._icmp_probe_with_helper(host)
if helper_result is not None:
return helper_result

try:
dest = socket.gethostbyname(host)
except socket.gaierror:
addresses = _resolve_all(host, None, socket.SOCK_RAW)
if not addresses:
return ProbeResult(latency_ms=None, timeout=True, method="icmp")

sock = socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP
)
for family, sockaddr in addresses:
if family == socket.AF_INET6:
result = self._icmp_raw_v6(sockaddr)
else:
result = self._icmp_raw_v4(sockaddr)
if not result.timeout:
return result
return ProbeResult(latency_ms=None, timeout=True, method="icmp")

def _icmp_raw_v4(self, sockaddr: tuple) -> ProbeResult:
"""Raw-socket ICMPv4 echo. Requires CAP_NET_RAW or root."""
try:
sock = socket.socket(
socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP
)
except OSError:
return ProbeResult(latency_ms=None, timeout=True, method="icmp")
sock.settimeout(PROBE_TIMEOUT_S)
try:
self._seq = (self._seq + 1) & 0xFFFF
ident = os.getpid() & 0xFFFF
packet = self._build_icmp_packet(
seq=self._seq, ident=os.getpid() & 0xFFFF
seq=self._seq, ident=ident, type_=8
)
start = time.monotonic()
sock.sendto(packet, (dest, 0))
sock.sendto(packet, (sockaddr[0], 0))
while True:
remaining = PROBE_TIMEOUT_S - (time.monotonic() - start)
if remaining <= 0:
Expand All @@ -133,11 +182,52 @@ def _icmp_probe(self, host: str) -> ProbeResult:
icmp_type, _, _, pkt_id, pkt_seq = struct.unpack(
"!BBHHH", icmp_header
)
if (
icmp_type == 0
and pkt_id == (os.getpid() & 0xFFFF)
and pkt_seq == self._seq
):
if icmp_type == 0 and pkt_id == ident and pkt_seq == self._seq:
elapsed = (time.monotonic() - start) * 1000
return ProbeResult(
latency_ms=round(elapsed, 2),
timeout=False,
method="icmp",
)
except (socket.timeout, OSError):
return ProbeResult(latency_ms=None, timeout=True, method="icmp")
finally:
sock.close()

def _icmp_raw_v6(self, sockaddr: tuple) -> ProbeResult:
"""Raw-socket ICMPv6 echo. The kernel computes the ICMPv6 checksum."""
try:
sock = socket.socket(
socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_ICMPV6
)
except OSError:
return ProbeResult(latency_ms=None, timeout=True, method="icmp")
sock.settimeout(PROBE_TIMEOUT_S)
try:
self._seq = (self._seq + 1) & 0xFFFF
ident = os.getpid() & 0xFFFF
# ICMPv6 echo request: type=128, code=0. Kernel fills checksum.
packet = self._build_icmp_packet(
seq=self._seq, ident=ident, type_=128
)
start = time.monotonic()
sock.sendto(packet, sockaddr)
while True:
remaining = PROBE_TIMEOUT_S - (time.monotonic() - start)
if remaining <= 0:
return ProbeResult(
latency_ms=None, timeout=True, method="icmp"
)
sock.settimeout(remaining)
data, _ = sock.recvfrom(1024)
# IPv6 raw sockets deliver only the ICMPv6 payload, no IP header
if len(data) < 8:
continue
icmp_type, _, _, pkt_id, pkt_seq = struct.unpack(
"!BBHHH", data[:8]
)
# echo reply type=129
if icmp_type == 129 and pkt_id == ident and pkt_seq == self._seq:
elapsed = (time.monotonic() - start) * 1000
return ProbeResult(
latency_ms=round(elapsed, 2),
Expand Down Expand Up @@ -205,13 +295,17 @@ def _icmp_probe_with_helper(self, host: str) -> ProbeResult | None:
return None

@staticmethod
def _build_icmp_packet(seq: int, ident: int) -> bytes:
"""Build ICMP echo request packet with checksum."""
# Type 8 = echo request, code 0
header = struct.pack("!BBHHH", 8, 0, 0, ident, seq)
def _build_icmp_packet(seq: int, ident: int, type_: int = 8) -> bytes:
"""Build an ICMP/ICMPv6 echo request packet.

ICMPv4 requires a software checksum; the Linux kernel writes the
ICMPv6 checksum on AF_INET6 raw sockets, so a zero checksum is fine.
"""
header = struct.pack("!BBHHH", type_, 0, 0, ident, seq)
payload = b"\x00" * 32
checksum = ProbeEngine._icmp_checksum(header + payload)
header = struct.pack("!BBHHH", 8, 0, checksum, ident, seq)
if type_ == 8:
checksum = ProbeEngine._icmp_checksum(header + payload)
header = struct.pack("!BBHHH", type_, 0, checksum, ident, seq)
return header + payload

@staticmethod
Expand Down
Loading
Loading