-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpinger.py
More file actions
97 lines (86 loc) · 3.31 KB
/
pinger.py
File metadata and controls
97 lines (86 loc) · 3.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import asyncio
import struct
import time
import socket
import os
import sys
ICMP_ECHO = 8
def check_root_permissions():
"""Check if script has necessary permissions to create raw sockets."""
if os.name == 'nt': # Windows
try:
import ctypes
return ctypes.windll.shell32.IsUserAnAdmin() != 0
except:
return False
else: # Unix/Linux/Mac
return os.geteuid() == 0
def checksum(data):
s = 0
for i in range(0, len(data), 2):
w = data[i] + (data[i+1] << 8 if i+1 < len(data) else 0)
s += w
s = (s >> 16) + (s & 0xffff)
s += (s >> 16)
return ~s & 0xffff
async def ping(host, count=4, timeout=1):
try:
dest = socket.gethostbyname(host)
except socket.gaierror:
print(f"{host} not found")
return
sock = None
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_ICMP)
sock.setblocking(False)
for seq in range(count):
header = struct.pack('bbHHh', ICMP_ECHO, 0, 0, os.getpid() & 0xFFFF, seq)
payload = struct.pack('d', time.time())
chksum = checksum(header + payload)
header = struct.pack('bbHHh', ICMP_ECHO, 0, socket.htons(chksum), os.getpid() & 0xFFFF, seq)
packet = header + payload
await asyncio.get_event_loop().sock_sendall(sock, packet)
start = time.time()
while True:
try:
data, addr = await asyncio.wait_for(asyncio.get_event_loop().sock_recv(sock, 1024), timeout)
icmp_header = data[20:28]
type, code, csum, pid, sequence = struct.unpack('bbHHh', icmp_header)
if pid == (os.getpid() & 0xFFFF) and sequence == seq:
sent_time = struct.unpack('d', data[28:28+8])[0]
rtt = (time.time() - sent_time) * 1000
print(f"{host} reply: seq={seq} rtt={rtt:.2f} ms")
break
except asyncio.TimeoutError:
print(f"{host} request timed out")
break
await asyncio.sleep(0.2)
except PermissionError:
print(f"[Error] Permission denied for {host}. This script requires root/admin privileges.")
except Exception as e:
print(f"[Error] Failed to ping {host}: {type(e).__name__}: {str(e)}")
finally:
# Fixed resource leak: always close socket
if sock:
sock.close()
async def main():
# Check permissions before starting
if not check_root_permissions():
print("[Error] This script requires root/administrator privileges to create raw sockets.")
if os.name == 'nt':
print("[Info] On Windows: Run as Administrator")
else:
print("[Info] On Unix/Linux: Run with 'sudo python pinger.py'")
sys.exit(1)
hosts = ["8.8.8.8", "1.1.1.1", "8.8.4.4"]
print(f"Pinging {len(hosts)} hosts...\n")
await asyncio.gather(*(ping(h) for h in hosts))
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n[Info] Program interrupted by user.")
sys.exit(0)
except Exception as e:
print(f"\n[Fatal Error] {type(e).__name__}: {str(e)}", file=sys.stderr)
sys.exit(1)