diff --git a/Cargo.toml b/Cargo.toml index 7ad3046..6393db1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ping-async" -version = "1.0.1" +version = "1.0.2" authors = ["hankbao "] license = "MIT" description = "Unprivileged Async Ping" @@ -17,7 +17,7 @@ futures = "0.3" static_assertions = "1.1.0" [target.'cfg(windows)'.dependencies.windows] -version = "0.61.3" +version = "0.62.2" features = [ "Win32_Foundation", "Win32_Networking_WinSock", @@ -30,8 +30,8 @@ features = [ [target.'cfg(any(target_os = "macos", target_os = "linux"))'.dependencies] rand = "0.9.2" -socket2 = "0.6.0" -tokio = { version = "1.47.0", features = [ +socket2 = "0.6.2" +tokio = { version = "1.49.0", features = [ "macros", "net", "rt", @@ -40,7 +40,7 @@ tokio = { version = "1.47.0", features = [ ] } [dev-dependencies] -tokio = { version = "1.47.0", features = [ +tokio = { version = "1.49.0", features = [ "macros", "rt-multi-thread", "time", diff --git a/src/icmp.rs b/src/icmp.rs index 77163d7..b471633 100644 --- a/src/icmp.rs +++ b/src/icmp.rs @@ -5,6 +5,32 @@ use std::convert::TryFrom; use std::io; use std::net::IpAddr; +use crate::IcmpEchoStatus; + +// ICMP error message type codes +const ICMPV4_DEST_UNREACHABLE: u8 = 3; +const ICMPV4_TIME_EXCEEDED: u8 = 11; +const ICMPV6_DEST_UNREACHABLE: u8 = 1; +const ICMPV6_TIME_EXCEEDED: u8 = 3; + +// ICMP echo request type codes (for verifying embedded packets) +const ICMPV4_ECHO_REQUEST: u8 = 8; +const ICMPV6_ECHO_REQUEST: u8 = 128; + +// IP protocol numbers +const IP_PROTO_ICMP: u8 = 1; +const IP_PROTO_ICMPV6: u8 = 58; + +// Fixed IPv6 header length +const IPV6_HEADER_LEN: usize = 40; + +/// Information extracted from an ICMP error message that embeds an original echo request. +pub struct IcmpErrorInfo { + pub identifier: u16, + pub sequence: u16, + pub status: IcmpEchoStatus, +} + /// ICMP message types for echo request/reply operations. /// /// This enum defines the standard ICMP type codes used for ping operations @@ -234,7 +260,7 @@ impl IcmpPacket { // IPv6 checksums are handled by the kernel if icmp_type != IcmpType::EchoRequestV4.as_u8() - || icmp_type != IcmpType::EchoReplyV4.as_u8() + && icmp_type != IcmpType::EchoReplyV4.as_u8() { return true; } @@ -300,11 +326,7 @@ impl IcmpPacket { /// Returns `Some(IcmpPacket)` if the data contains a valid echo reply for /// the target address, `None` otherwise. pub fn parse_reply(data: &[u8], target_addr: IpAddr) -> Option { - let icmp_offset = if cfg!(target_os = "macos") && target_addr.is_ipv4() { - 20 // Skip IP header for IPv4 on macOS - } else { - 0 - }; + let icmp_offset = Self::ipv4_header_offset(data, target_addr)?; if data.len() < icmp_offset + 8 { return None; // Not enough data @@ -321,6 +343,130 @@ impl IcmpPacket { Self::parse(icmp_data) } + /// Parse an ICMP error message (Destination Unreachable, Time Exceeded) that + /// embeds the original echo request, and extract the identifier and sequence. + /// + /// # Packet structure + /// + /// For IPv4 ICMP errors: + /// `[outer IP header (macOS only, variable IHL)] [ICMP error header (8)] [embedded IP header (variable IHL)] [embedded ICMP echo (8)]` + /// + /// For IPv6 ICMPv6 errors: + /// `[ICMPv6 error header (8)] [embedded IPv6 header (40)] [embedded ICMPv6 echo (8)]` + /// + /// # Known limitations + /// + /// IPv6 parsing assumes no extension headers between the embedded IPv6 header + /// and the ICMPv6 echo header. This is valid for typical echo requests. + pub fn parse_error_reply(data: &[u8], target_addr: IpAddr) -> Option { + let outer_offset = Self::ipv4_header_offset(data, target_addr)?; + + // Need at least the ICMP error header (8 bytes) + if data.len() < outer_offset + 8 { + return None; + } + + let icmp_data = &data[outer_offset..]; + let icmp_type = icmp_data[0]; + + // Determine status from ICMP error type + let status = if target_addr.is_ipv4() { + match icmp_type { + ICMPV4_DEST_UNREACHABLE => IcmpEchoStatus::Unreachable, + ICMPV4_TIME_EXCEEDED => IcmpEchoStatus::TimedOut, + _ => return None, + } + } else { + match icmp_type { + ICMPV6_DEST_UNREACHABLE => IcmpEchoStatus::Unreachable, + ICMPV6_TIME_EXCEEDED => IcmpEchoStatus::TimedOut, + _ => return None, + } + }; + + // Skip the 8-byte ICMP error header to reach the embedded original packet + let embedded = &icmp_data[8..]; + + if target_addr.is_ipv4() { + // Embedded IPv4 packet + if embedded.is_empty() { + return None; + } + let version = (embedded[0] >> 4) & 0x0F; + if version != 4 { + return None; + } + let embedded_ihl = (embedded[0] & 0x0F) as usize * 4; + if embedded_ihl < 20 { + return None; + } + // Need embedded IP header + 8 bytes of embedded ICMP echo header + if embedded.len() < embedded_ihl + 8 { + return None; + } + // Verify protocol is ICMP (byte 9 of IP header) + if embedded[9] != IP_PROTO_ICMP { + return None; + } + let embedded_icmp = &embedded[embedded_ihl..]; + // Verify embedded ICMP type is Echo Request + if embedded_icmp[0] != ICMPV4_ECHO_REQUEST { + return None; + } + let identifier = u16::from_be_bytes([embedded_icmp[4], embedded_icmp[5]]); + let sequence = u16::from_be_bytes([embedded_icmp[6], embedded_icmp[7]]); + Some(IcmpErrorInfo { + identifier, + sequence, + status, + }) + } else { + // Embedded IPv6 packet + if embedded.len() < IPV6_HEADER_LEN + 8 { + return None; + } + let version = (embedded[0] >> 4) & 0x0F; + if version != 6 { + return None; + } + // Next Header field at byte 6 + if embedded[6] != IP_PROTO_ICMPV6 { + return None; + } + let embedded_icmpv6 = &embedded[IPV6_HEADER_LEN..]; + // Verify embedded ICMPv6 type is Echo Request + if embedded_icmpv6[0] != ICMPV6_ECHO_REQUEST { + return None; + } + let identifier = u16::from_be_bytes([embedded_icmpv6[4], embedded_icmpv6[5]]); + let sequence = u16::from_be_bytes([embedded_icmpv6[6], embedded_icmpv6[7]]); + Some(IcmpErrorInfo { + identifier, + sequence, + status, + }) + } + } + + /// Returns the platform-specific outer IP header offset for the given data. + /// + /// On macOS, IPv4 DGRAM sockets include the IP header; the IHL is read from + /// the first nibble. On Linux and for IPv6, the offset is always 0. + fn ipv4_header_offset(data: &[u8], target_addr: IpAddr) -> Option { + if cfg!(target_os = "macos") && target_addr.is_ipv4() { + if data.is_empty() { + return None; + } + let ihl = (data[0] & 0x0F) as usize * 4; + if ihl < 20 { + return None; + } + Some(ihl) + } else { + Some(0) + } + } + /// Calculate Internet checksum (RFC 1071) /// Returns the 16-bit one's complement checksum fn calculate_checksum(data: &[u8]) -> u16 { @@ -483,7 +629,8 @@ mod tests { #[test] #[cfg(target_os = "macos")] fn test_parse_icmp_reply_macos() { - let ip_header = [0u8; 20]; // 20-byte IP header on macOS + let mut ip_header = [0u8; 20]; + ip_header[0] = 0x45; // version=4, IHL=5 (20 bytes) let icmp_data = [ 0, // Echo Reply 0, // Code @@ -567,4 +714,275 @@ mod tests { let target = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); assert!(IcmpPacket::parse_reply(&wrong_type_packet, target).is_none()); } + + /// Helper: build a minimal IPv4 header (20 bytes, IHL=5). + fn make_ipv4_header(protocol: u8) -> Vec { + let mut hdr = vec![0u8; 20]; + hdr[0] = 0x45; // version 4, IHL 5 (20 bytes) + hdr[9] = protocol; + hdr + } + + /// Helper: build an ICMP echo request header (8 bytes) with the given id/seq. + fn make_echo_request_v4(identifier: u16, sequence: u16) -> Vec { + let mut pkt = vec![0u8; 8]; + pkt[0] = ICMPV4_ECHO_REQUEST; // type + pkt[4..6].copy_from_slice(&identifier.to_be_bytes()); + pkt[6..8].copy_from_slice(&sequence.to_be_bytes()); + pkt + } + + /// Helper: build an ICMPv6 echo request header (8 bytes) with the given id/seq. + fn make_echo_request_v6(identifier: u16, sequence: u16) -> Vec { + let mut pkt = vec![0u8; 8]; + pkt[0] = ICMPV6_ECHO_REQUEST; // type + pkt[4..6].copy_from_slice(&identifier.to_be_bytes()); + pkt[6..8].copy_from_slice(&sequence.to_be_bytes()); + pkt + } + + #[test] + #[cfg(target_os = "linux")] + fn test_parse_error_reply_v4_dest_unreachable_linux() { + let target = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)); + + // Build: [ICMP Dest Unreachable header (8)] [embedded IPv4 (20)] [embedded echo (8)] + let mut packet = Vec::new(); + // ICMP error header: type=3, code=1, checksum=0, unused=0 + packet.extend_from_slice(&[ICMPV4_DEST_UNREACHABLE, 1, 0, 0, 0, 0, 0, 0]); + packet.extend_from_slice(&make_ipv4_header(IP_PROTO_ICMP)); + packet.extend_from_slice(&make_echo_request_v4(0xABCD, 0x0042)); + + let info = IcmpPacket::parse_error_reply(&packet, target).unwrap(); + assert_eq!(info.identifier, 0xABCD); + assert_eq!(info.sequence, 0x0042); + assert_eq!(info.status, IcmpEchoStatus::Unreachable); + } + + #[test] + #[cfg(target_os = "linux")] + fn test_parse_error_reply_v4_time_exceeded_linux() { + let target = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + + let mut packet = Vec::new(); + // ICMP error header: type=11 (Time Exceeded), code=0 + packet.extend_from_slice(&[ICMPV4_TIME_EXCEEDED, 0, 0, 0, 0, 0, 0, 0]); + packet.extend_from_slice(&make_ipv4_header(IP_PROTO_ICMP)); + packet.extend_from_slice(&make_echo_request_v4(0x1234, 0x0007)); + + let info = IcmpPacket::parse_error_reply(&packet, target).unwrap(); + assert_eq!(info.identifier, 0x1234); + assert_eq!(info.sequence, 0x0007); + assert_eq!(info.status, IcmpEchoStatus::TimedOut); + } + + #[test] + fn test_parse_error_reply_v6_dest_unreachable() { + let target: IpAddr = "2001:db8::1".parse().unwrap(); + + // Build: [ICMPv6 error header (8)] [embedded IPv6 (40)] [embedded echo (8)] + let mut packet = Vec::new(); + // ICMPv6 error header: type=1 (Dest Unreachable), code=0 + packet.extend_from_slice(&[ICMPV6_DEST_UNREACHABLE, 0, 0, 0, 0, 0, 0, 0]); + // Embedded IPv6 header (40 bytes) + let mut ipv6_hdr = vec![0u8; 40]; + ipv6_hdr[0] = 0x60; // version 6 + ipv6_hdr[6] = IP_PROTO_ICMPV6; // Next Header + packet.extend_from_slice(&ipv6_hdr); + packet.extend_from_slice(&make_echo_request_v6(0x5678, 0x0003)); + + let info = IcmpPacket::parse_error_reply(&packet, target).unwrap(); + assert_eq!(info.identifier, 0x5678); + assert_eq!(info.sequence, 0x0003); + assert_eq!(info.status, IcmpEchoStatus::Unreachable); + } + + #[test] + fn test_parse_error_reply_v6_time_exceeded() { + let target: IpAddr = "fe80::1".parse().unwrap(); + + let mut packet = Vec::new(); + // ICMPv6 error header: type=3 (Time Exceeded), code=0 + packet.extend_from_slice(&[ICMPV6_TIME_EXCEEDED, 0, 0, 0, 0, 0, 0, 0]); + let mut ipv6_hdr = vec![0u8; 40]; + ipv6_hdr[0] = 0x60; + ipv6_hdr[6] = IP_PROTO_ICMPV6; + packet.extend_from_slice(&ipv6_hdr); + packet.extend_from_slice(&make_echo_request_v6(0x9999, 0x000A)); + + let info = IcmpPacket::parse_error_reply(&packet, target).unwrap(); + assert_eq!(info.identifier, 0x9999); + assert_eq!(info.sequence, 0x000A); + assert_eq!(info.status, IcmpEchoStatus::TimedOut); + } + + #[test] + #[cfg(target_os = "linux")] + fn test_parse_error_reply_wrong_protocol_linux() { + let target = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + + let mut packet = Vec::new(); + packet.extend_from_slice(&[ICMPV4_DEST_UNREACHABLE, 0, 0, 0, 0, 0, 0, 0]); + // Embedded IP header with protocol=17 (UDP), not ICMP + packet.extend_from_slice(&make_ipv4_header(17)); + packet.extend_from_slice(&make_echo_request_v4(0x1234, 0x0001)); + + assert!(IcmpPacket::parse_error_reply(&packet, target).is_none()); + } + + #[test] + #[cfg(target_os = "linux")] + fn test_parse_error_reply_wrong_embedded_type_linux() { + let target = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + + let mut packet = Vec::new(); + packet.extend_from_slice(&[ICMPV4_DEST_UNREACHABLE, 0, 0, 0, 0, 0, 0, 0]); + packet.extend_from_slice(&make_ipv4_header(IP_PROTO_ICMP)); + // Embedded ICMP with type=0 (Echo Reply) instead of Echo Request + let mut echo = make_echo_request_v4(0x1234, 0x0001); + echo[0] = 0; // Echo Reply + packet.extend_from_slice(&echo); + + assert!(IcmpPacket::parse_error_reply(&packet, target).is_none()); + } + + #[test] + #[cfg(target_os = "linux")] + fn test_parse_error_reply_truncated_v4_linux() { + let target = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + + // Too short — only the ICMP error header, no embedded packet + let packet = [ICMPV4_DEST_UNREACHABLE, 0, 0, 0, 0, 0, 0, 0]; + assert!(IcmpPacket::parse_error_reply(&packet, target).is_none()); + + // Truncated embedded IP header (only 10 bytes instead of 20+) + let mut packet2 = Vec::new(); + packet2.extend_from_slice(&[ICMPV4_DEST_UNREACHABLE, 0, 0, 0, 0, 0, 0, 0]); + packet2.extend_from_slice(&[0x45, 0, 0, 0, 0, 0, 0, 0, 0, IP_PROTO_ICMP]); + assert!(IcmpPacket::parse_error_reply(&packet2, target).is_none()); + } + + #[test] + fn test_parse_error_reply_truncated_v6() { + let target_v6: IpAddr = "::1".parse().unwrap(); + let mut packet = Vec::new(); + packet.extend_from_slice(&[ICMPV6_DEST_UNREACHABLE, 0, 0, 0, 0, 0, 0, 0]); + packet.extend_from_slice(&[0x60; 20]); // Only 20 bytes, need 40+8 + assert!(IcmpPacket::parse_error_reply(&packet, target_v6).is_none()); + } + + #[test] + #[cfg(target_os = "linux")] + fn test_parse_error_reply_not_an_error_linux() { + let target = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + + // ICMP Echo Reply (type=0) is not an error message + let mut packet = Vec::new(); + packet.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0]); // type=0 (Echo Reply) + packet.extend_from_slice(&make_ipv4_header(IP_PROTO_ICMP)); + packet.extend_from_slice(&make_echo_request_v4(0x1234, 0x0001)); + + assert!(IcmpPacket::parse_error_reply(&packet, target).is_none()); + } + + #[test] + #[cfg(target_os = "macos")] + fn test_parse_error_reply_macos_with_outer_ip_header() { + let target = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)); + + // On macOS IPv4, the raw socket includes the outer IP header + let mut packet = Vec::new(); + // Outer IP header (20 bytes, IHL=5) + packet.extend_from_slice(&make_ipv4_header(IP_PROTO_ICMP)); + // ICMP Dest Unreachable header + packet.extend_from_slice(&[ICMPV4_DEST_UNREACHABLE, 1, 0, 0, 0, 0, 0, 0]); + // Embedded IP header + packet.extend_from_slice(&make_ipv4_header(IP_PROTO_ICMP)); + // Embedded echo request + packet.extend_from_slice(&make_echo_request_v4(0xBEEF, 0x0010)); + + let info = IcmpPacket::parse_error_reply(&packet, target).unwrap(); + assert_eq!(info.identifier, 0xBEEF); + assert_eq!(info.sequence, 0x0010); + assert_eq!(info.status, IcmpEchoStatus::Unreachable); + } + + #[test] + #[cfg(target_os = "macos")] + fn test_parse_error_reply_v4_time_exceeded_macos() { + let target = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + + let mut packet = Vec::new(); + packet.extend_from_slice(&make_ipv4_header(IP_PROTO_ICMP)); // outer IP header + packet.extend_from_slice(&[ICMPV4_TIME_EXCEEDED, 0, 0, 0, 0, 0, 0, 0]); + packet.extend_from_slice(&make_ipv4_header(IP_PROTO_ICMP)); + packet.extend_from_slice(&make_echo_request_v4(0x1234, 0x0007)); + + let info = IcmpPacket::parse_error_reply(&packet, target).unwrap(); + assert_eq!(info.identifier, 0x1234); + assert_eq!(info.sequence, 0x0007); + assert_eq!(info.status, IcmpEchoStatus::TimedOut); + } + + #[test] + #[cfg(target_os = "macos")] + fn test_parse_error_reply_wrong_protocol_macos() { + let target = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + + let mut packet = Vec::new(); + packet.extend_from_slice(&make_ipv4_header(IP_PROTO_ICMP)); // outer IP header + packet.extend_from_slice(&[ICMPV4_DEST_UNREACHABLE, 0, 0, 0, 0, 0, 0, 0]); + packet.extend_from_slice(&make_ipv4_header(17)); // embedded: UDP, not ICMP + packet.extend_from_slice(&make_echo_request_v4(0x1234, 0x0001)); + + assert!(IcmpPacket::parse_error_reply(&packet, target).is_none()); + } + + #[test] + #[cfg(target_os = "macos")] + fn test_parse_error_reply_wrong_embedded_type_macos() { + let target = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + + let mut packet = Vec::new(); + packet.extend_from_slice(&make_ipv4_header(IP_PROTO_ICMP)); // outer IP header + packet.extend_from_slice(&[ICMPV4_DEST_UNREACHABLE, 0, 0, 0, 0, 0, 0, 0]); + packet.extend_from_slice(&make_ipv4_header(IP_PROTO_ICMP)); + let mut echo = make_echo_request_v4(0x1234, 0x0001); + echo[0] = 0; // Echo Reply instead of Echo Request + packet.extend_from_slice(&echo); + + assert!(IcmpPacket::parse_error_reply(&packet, target).is_none()); + } + + #[test] + #[cfg(target_os = "macos")] + fn test_parse_error_reply_truncated_v4_macos() { + let target = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + + // Outer IP header only — no ICMP error header + let packet = make_ipv4_header(IP_PROTO_ICMP); + assert!(IcmpPacket::parse_error_reply(&packet, target).is_none()); + + // Outer IP header + ICMP error header but no embedded packet + let mut packet2 = Vec::new(); + packet2.extend_from_slice(&make_ipv4_header(IP_PROTO_ICMP)); + packet2.extend_from_slice(&[ICMPV4_DEST_UNREACHABLE, 0, 0, 0, 0, 0, 0, 0]); + assert!(IcmpPacket::parse_error_reply(&packet2, target).is_none()); + } + + #[test] + #[cfg(target_os = "macos")] + fn test_parse_error_reply_not_an_error_macos() { + let target = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + + let mut packet = Vec::new(); + // outer IP header + packet.extend_from_slice(&make_ipv4_header(IP_PROTO_ICMP)); + // type=0 (Echo Reply) — not an error message + packet.extend_from_slice(&[0, 0, 0, 0, 0, 0, 0, 0]); + packet.extend_from_slice(&make_ipv4_header(IP_PROTO_ICMP)); + packet.extend_from_slice(&make_echo_request_v4(0x1234, 0x0001)); + + assert!(IcmpPacket::parse_error_reply(&packet, target).is_none()); + } } diff --git a/src/platform/socket.rs b/src/platform/socket.rs index 0bcfbaa..8dda2c6 100644 --- a/src/platform/socket.rs +++ b/src/platform/socket.rs @@ -19,11 +19,31 @@ use crate::{ type RequestRegistry = Arc>>>; +/// Persistent record of a fatal router error, stored so that all subsequent +/// `send()` calls fail fast with the same error information. +struct RouterError { + kind: io::ErrorKind, + message: String, +} + +impl RouterError { + fn from_io_error(e: &io::Error) -> Self { + RouterError { + kind: e.kind(), + message: e.to_string(), + } + } + + fn to_io_error(&self) -> io::Error { + io::Error::new(self.kind, self.message.clone()) + } +} + struct RouterContext { target_addr: IpAddr, socket: Arc, registry: RequestRegistry, - failed: Arc>>, + failed: Arc>>, } /// Requestor for sending ICMP Echo Requests (ping) and receiving replies on Unix systems. @@ -149,7 +169,7 @@ impl IcmpEchoRequestor { target_addr, socket: Arc::clone(&socket), registry: Arc::clone(®istry), - failed: Arc::new(Mutex::new(None)), + failed: Arc::new(Mutex::new(None::)), }; Ok(IcmpEchoRequestor { @@ -227,16 +247,15 @@ impl IcmpEchoRequestor { /// } /// ``` pub async fn send(&self) -> io::Result { - // Check if router failed already - if let Some(failed) = self + // Check if router failed already — error is persistent, not consumed + if let Some(ref router_error) = *self .inner .router_context .failed .lock() .unwrap_or_else(|poisoned| poisoned.into_inner()) - .take() { - return Err(failed); + return Err(router_error.to_io_error()); } // lazy spawning @@ -259,32 +278,35 @@ impl IcmpEchoRequestor { &payload, ); - let target = SocketAddr::new(self.inner.target_addr, 0); - let reply_rx = match self.inner.socket.send_to(packet.as_bytes(), target).await { - Ok(_) => { - let (tx, rx) = oneshot::channel(); - - self.inner - .registry - .lock() - .unwrap_or_else(|poisoned| poisoned.into_inner()) - .insert(key, tx); + // Register in the registry BEFORE sending so fast replies (e.g. loopback) + // are not dropped by the router due to a missing entry. + let (tx, reply_rx) = oneshot::channel(); + self.inner + .registry + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()) + .insert(key, tx); - rx - } - Err(e) => match e.kind() { + let target = SocketAddr::new(self.inner.target_addr, 0); + if let Err(e) = self.inner.socket.send_to(packet.as_bytes(), target).await { + // Send failed — remove the registry entry we just inserted + self.inner + .registry + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()) + .remove(&key); + + return match e.kind() { io::ErrorKind::NetworkUnreachable | io::ErrorKind::NetworkDown - | io::ErrorKind::HostUnreachable => { - return Ok(IcmpEchoReply::new( - self.inner.target_addr, - IcmpEchoStatus::Unreachable, - Duration::ZERO, - )); - } - _ => return Err(e), - }, - }; + | io::ErrorKind::HostUnreachable => Ok(IcmpEchoReply::new( + self.inner.target_addr, + IcmpEchoStatus::Unreachable, + Duration::ZERO, + )), + _ => Err(e), + }; + } let timeout = self.inner.timeout; let target_addr = self.inner.target_addr; @@ -294,8 +316,18 @@ impl IcmpEchoRequestor { match result { Ok(reply) => Ok(reply), Err(_) => { - // Channel closed - router probably failed - Err(io::Error::other("reply channel closed")) + // Channel closed — check if router failed for a consistent error + if let Some(ref router_error) = *self + .inner + .router_context + .failed + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()) + { + Err(router_error.to_io_error()) + } else { + Err(io::Error::other("reply channel closed")) + } } } } @@ -308,7 +340,7 @@ impl IcmpEchoRequestor { .duration_since(UNIX_EPOCH) .map_err(|e| io::Error::other(format!("timestamp error: {e}")))? .as_nanos() as u64; - let rtt = Duration::from_nanos(now - timestamp); + let rtt = Duration::from_nanos(now.saturating_sub(timestamp)); Ok(IcmpEchoReply::new( target_addr, @@ -352,7 +384,7 @@ async fn reply_router_loop( identifier: u16, socket: Arc, registry: RequestRegistry, - failed: Arc>>, + failed: Arc>>, ) { loop { let mut buf = vec![0u8; 1024]; @@ -399,6 +431,23 @@ async fn reply_router_loop( // Send reply to waiting thread let _ = sender.send(reply); } + } else if let Some(error_info) = IcmpPacket::parse_error_reply(&buf, target_addr) { + // ICMP error message (Dest Unreachable, Time Exceeded) with + // embedded echo request — match by identifier and sequence. + if error_info.identifier != identifier { + continue; + } + + let sender = registry + .lock() + .unwrap_or_else(|poisoned| poisoned.into_inner()) + .remove(&error_info.sequence); + + if let Some(sender) = sender { + let reply = + IcmpEchoReply::new(target_addr, error_info.status, Duration::ZERO); + let _ = sender.send(reply); + } } } Err(e) => { @@ -408,12 +457,15 @@ async fn reply_router_loop( io::ErrorKind::AddrNotAvailable | // Address no longer available io::ErrorKind::ConnectionAborted | // Socket forcibly closed io::ErrorKind::NotConnected => { // Socket disconnected - // Clear pending requests so they don't hang - registry.lock().unwrap_or_else(|poisoned| poisoned.into_inner()).clear(); - - // Mark the failed flag + // Store the error persistently so all future send() calls fail fast let mut failed_lock = failed.lock().unwrap_or_else(|poisoned| poisoned.into_inner()); - *failed_lock = Some(e); + *failed_lock = Some(RouterError::from_io_error(&e)); + + // Drain the registry — dropping senders closes channels, which + // causes in-flight send() calls to see Canceled and check `failed` + // for a consistent error. Entries already removed by timeout are + // simply absent. + registry.lock().unwrap_or_else(|poisoned| poisoned.into_inner()).clear(); return; }