Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions pentest/port-scanner/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# PortScan - 轻量级 macOS 端口扫描工具

纯 Python 标准库实现,无需安装任何依赖,专为 macOS 优化。

## 特性

- 零依赖,只用 Python 3 标准库
- 多线程并发扫描,速度快
- 支持自定义端口范围 / 列表 / Top 100 常用端口
- Banner 抓取(可选)
- 服务识别(80+ 常见端口)
- 彩色终端输出

## 快速开始

```bash
# 克隆或进入目录
cd pentest/port-scanner

# 赋予执行权限(可选)
chmod +x portscan.py

# 扫描 Top 100 常用端口(默认)
python3 portscan.py 192.168.1.1

# 扫描指定端口
python3 portscan.py example.com -p 80,443,8080

# 扫描端口范围
python3 portscan.py 10.0.0.1 -p 1-1024

# 开启 Banner 抓取
python3 portscan.py 192.168.1.1 -p top100 -b

# 全端口扫描(500线程,超时0.5s)
python3 portscan.py 192.168.1.1 -p 1-65535 -t 500 --timeout 0.5
```

## 参数说明

| 参数 | 说明 | 默认值 |
|------|------|--------|
| `target` | 目标 IP 或域名 | 必填 |
| `-p / --ports` | 端口:`80`、`80,443`、`1-1024`、`top100` | `top100` |
| `-t / --threads` | 线程数(1-1000) | `100` |
| `--timeout` | 连接超时(秒) | `1.0` |
| `-b / --banner` | 开启 Banner 抓取 | 关闭 |

## 内置 Top 100 端口

包含 FTP、SSH、HTTP/S、MySQL、Redis、MongoDB、Elasticsearch 等 100 个最常见服务端口。

## 注意事项

> 本工具仅供授权测试、安全研究和学习使用。
> 请勿对未授权目标进行扫描,遵守当地法律法规。
252 changes: 252 additions & 0 deletions pentest/port-scanner/portscan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
#!/usr/bin/env python3
"""
PortScan - Lightweight macOS Port Scanner
Usage: python3 portscan.py <target> [options]
"""

import socket
import sys
import argparse
import threading
import time
import resource
from queue import Queue, Empty
from datetime import datetime

# Raised at startup; semaphore is created per-scan based on thread count
_connection_sem: threading.Semaphore = threading.Semaphore(10)


def _raise_fd_limit():
"""Raise the process file-descriptor soft limit to safe headroom on macOS."""
try:
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
target = min(hard, 2048)
if soft < target:
resource.setrlimit(resource.RLIMIT_NOFILE, (target, hard))
except Exception:
pass


# Common services mapping
COMMON_SERVICES = {
21: "FTP", 22: "SSH", 23: "Telnet", 25: "SMTP", 53: "DNS",
80: "HTTP", 110: "POP3", 111: "RPC", 135: "MSRPC", 139: "NetBIOS",
143: "IMAP", 443: "HTTPS", 445: "SMB", 465: "SMTPS", 587: "SMTP-TLS",
993: "IMAPS", 995: "POP3S", 1080: "SOCKS", 1433: "MSSQL", 1521: "Oracle",
2049: "NFS", 2181: "ZooKeeper", 3306: "MySQL", 3389: "RDP", 3690: "SVN",
4444: "Metasploit", 5432: "PostgreSQL", 5900: "VNC", 5984: "CouchDB",
6379: "Redis", 6443: "K8s-API", 7000: "Cassandra", 8080: "HTTP-Alt",
8443: "HTTPS-Alt", 8888: "Jupyter", 9200: "Elasticsearch", 9300: "ES-Cluster",
27017: "MongoDB", 27018: "MongoDB-shard", 28017: "MongoDB-HTTP",
}

# Top 100 common ports
TOP_100_PORTS = [
21, 22, 23, 25, 53, 80, 110, 111, 135, 139,
143, 443, 445, 465, 587, 993, 995, 1080, 1433, 1521,
2049, 2181, 3306, 3389, 3690, 4444, 5432, 5900, 5984, 6379,
6443, 7000, 8080, 8443, 8888, 9200, 9300, 27017, 27018, 28017,
20, 69, 79, 81, 82, 88, 102, 119, 194, 389,
500, 512, 513, 514, 515, 523, 548, 554, 631, 636,
873, 902, 992, 1194, 1234, 1337, 1723, 2000, 2222, 2375,
2376, 2888, 3000, 3001, 4000, 4001, 4040, 4848, 5000, 5001,
5601, 6000, 6001, 6080, 6588, 7001, 7002, 7070, 7443, 7474,
8000, 8008, 8009, 8081, 8161, 8181, 9000, 9001, 9090, 9999,
]


RESET = "\033[0m"
GREEN = "\033[92m"
RED = "\033[91m"
YELLOW = "\033[93m"
CYAN = "\033[96m"
BOLD = "\033[1m"
DIM = "\033[2m"


def print_banner():
banner = f"""{CYAN}{BOLD}
____ _ ____
| _ \\ ___ _ __| |_/ ___| ___ __ _ _ __
| |_) / _ \\| '__| __\\___ \\ / __/ _` | '_ \\
| __/ (_) | | | |_ ___) | (_| (_| | | | |
|_| \\___/|_| \\__|____/ \\___\\__,_|_| |_|
{RESET}{DIM} Lightweight macOS Port Scanner{RESET}
"""
print(banner)


def resolve_target(target: str) -> str:
try:
ip = socket.gethostbyname(target)
if ip != target:
print(f"{DIM} Resolved {target} -> {ip}{RESET}")
return ip
except socket.gaierror:
print(f"{RED}[!] Cannot resolve host: {target}{RESET}")
sys.exit(1)


def grab_banner(ip: str, port: int, timeout: float) -> str:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
s.connect((ip, port))
# Send HTTP-style probe for common ports
if port in (80, 8080, 8000, 8888):
s.send(b"HEAD / HTTP/1.0\r\n\r\n")
elif port == 22:
pass # SSH sends banner automatically
else:
s.send(b"\r\n")
banner = s.recv(1024).decode("utf-8", errors="ignore").strip()
# Return only the first line
return banner.split("\n")[0][:80] if banner else ""
except Exception:
return ""


def scan_port(ip: str, port: int, timeout: float, grab: bool, results: list, lock: threading.Lock):
with _connection_sem:
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
result = s.connect_ex((ip, port))
# scan socket is now CLOSED before we open banner socket
if result == 0:
service = COMMON_SERVICES.get(port, "unknown")
banner = grab_banner(ip, port, timeout) if grab else ""
with lock:
results.append((port, service, banner))
except Exception:
pass


def worker(ip: str, queue: Queue, timeout: float, grab: bool, results: list, lock: threading.Lock):
while True:
try:
port = queue.get(block=False)
except Empty:
break
scan_port(ip, port, timeout, grab, results, lock)
queue.task_done()


def parse_ports(port_arg: str) -> list:
"""Parse port argument: '80', '80,443', '1-1024', 'top100'"""
if port_arg == "top100":
return sorted(TOP_100_PORTS)
ports = []
for part in port_arg.split(","):
part = part.strip()
if "-" in part:
start, end = part.split("-", 1)
ports.extend(range(int(start), int(end) + 1))
else:
ports.append(int(part))
return sorted(set(ports))


def run_scan(target: str, ports: list, threads: int, timeout: float, grab: bool):
global _connection_sem
_raise_fd_limit()
# Keep concurrent sockets well below system fd limit.
# With banner grabbing each slot uses 1 socket at a time (scan closes before banner opens).
_connection_sem = threading.Semaphore(min(threads, 20))
ip = resolve_target(target)
print(f"\n{BOLD} Target :{RESET} {ip}")
print(f"{BOLD} Ports :{RESET} {len(ports)} port(s)")
print(f"{BOLD} Threads :{RESET} {threads}")
print(f"{BOLD} Timeout :{RESET} {timeout}s")
print(f"{BOLD} Started :{RESET} {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"\n{DIM} Scanning...{RESET}\n")

queue: Queue = Queue()
results: list = []
lock = threading.Lock()

for port in ports:
queue.put(port)

start_time = time.time()
thread_list = []
for _ in range(min(threads, len(ports))):
t = threading.Thread(target=worker, args=(ip, queue, timeout, grab, results, lock))
t.daemon = True
t.start()
thread_list.append(t)

queue.join()
elapsed = time.time() - start_time

# Sort results by port
results.sort(key=lambda x: x[0])

print(f" {'PORT':<8} {'STATE':<10} {'SERVICE':<16} {'BANNER'}")
print(f" {'-'*8} {'-'*10} {'-'*16} {'-'*40}")

if results:
for port, service, banner in results:
banner_str = f"{DIM}{banner}{RESET}" if banner else ""
print(f" {GREEN}{port:<8}{RESET} {'open':<10} {YELLOW}{service:<16}{RESET} {banner_str}")
else:
print(f" {DIM}No open ports found.{RESET}")

print(f"\n {DIM}Scanned {len(ports)} port(s) in {elapsed:.2f}s | {len(results)} open{RESET}\n")


def main():
print_banner()

parser = argparse.ArgumentParser(
description="Lightweight macOS Port Scanner",
formatter_class=argparse.RawTextHelpFormatter,
epilog="""
Examples:
python3 portscan.py 192.168.1.1
python3 portscan.py example.com -p 80,443,8080
python3 portscan.py 10.0.0.1 -p 1-1024 -t 200
python3 portscan.py 192.168.1.1 -p top100 -b
python3 portscan.py scanme.nmap.org -p 1-65535 -t 500 --timeout 0.5
"""
)
parser.add_argument("target", help="Target IP address or hostname")
parser.add_argument(
"-p", "--ports",
default="top100",
help="Ports to scan: '80', '80,443', '1-1024', 'top100' (default: top100)"
)
parser.add_argument(
"-t", "--threads",
type=int, default=20,
help="Number of threads (default: 20)"
)
parser.add_argument(
"--timeout",
type=float, default=1.0,
help="Connection timeout in seconds (default: 1.0)"
)
parser.add_argument(
"-b", "--banner",
action="store_true",
help="Enable banner grabbing"
)

args = parser.parse_args()

try:
ports = parse_ports(args.ports)
except ValueError as e:
print(f"{RED}[!] Invalid port specification: {e}{RESET}")
sys.exit(1)

if not 1 <= args.threads <= 1000:
print(f"{RED}[!] Threads must be between 1 and 1000{RESET}")
sys.exit(1)

run_scan(args.target, ports, args.threads, args.timeout, args.banner)


if __name__ == "__main__":
main()