32c8f4b156
Web-based TCP/UDP port checker with firewall/GFW diagnosis, PM2 deployment config, and Ubuntu one-click install script. Co-authored-by: Cursor <cursoragent@cursor.com>
208 lines
7.2 KiB
Python
208 lines
7.2 KiB
Python
"""端口检测与问题诊断模块"""
|
||
|
||
import socket
|
||
import subprocess
|
||
import time
|
||
from dataclasses import dataclass, field
|
||
from enum import Enum
|
||
|
||
|
||
class PortStatus(str, Enum):
|
||
OPEN = "open"
|
||
CLOSED = "closed"
|
||
FILTERED = "filtered"
|
||
UNKNOWN = "unknown"
|
||
|
||
|
||
class DiagnosisType(str, Enum):
|
||
PORT_OPEN = "port_open"
|
||
PORT_NOT_OPEN = "port_not_open"
|
||
LIKELY_FIREWALL = "likely_firewall"
|
||
LIKELY_BLOCKED = "likely_blocked"
|
||
HOST_UNREACHABLE = "host_unreachable"
|
||
DNS_FAILED = "dns_failed"
|
||
UDP_AMBIGUOUS = "udp_ambiguous"
|
||
|
||
|
||
DIAGNOSIS_MESSAGES = {
|
||
DiagnosisType.PORT_OPEN: "端口开放,服务正常可达",
|
||
DiagnosisType.PORT_NOT_OPEN: "端口未开放,目标主机上该端口无服务监听",
|
||
DiagnosisType.LIKELY_FIREWALL: "端口可能被防火墙拦截,主机可达但端口无响应",
|
||
DiagnosisType.LIKELY_BLOCKED: "端口可能被墙或上游网络过滤,连接超时",
|
||
DiagnosisType.HOST_UNREACHABLE: "目标主机不可达,请检查 IP/域名是否正确",
|
||
DiagnosisType.DNS_FAILED: "域名解析失败,请检查域名是否正确",
|
||
DiagnosisType.UDP_AMBIGUOUS: "UDP 端口状态不确定,可能开放或被过滤(UDP 协议特性)",
|
||
}
|
||
|
||
|
||
@dataclass
|
||
class CheckResult:
|
||
host: str
|
||
port: int
|
||
protocol: str
|
||
resolved_ip: str = ""
|
||
port_status: PortStatus = PortStatus.UNKNOWN
|
||
diagnosis: DiagnosisType = DiagnosisType.UDP_AMBIGUOUS
|
||
diagnosis_message: str = ""
|
||
host_reachable: bool = False
|
||
dns_ok: bool = False
|
||
latency_ms: float | None = None
|
||
details: list[str] = field(default_factory=list)
|
||
elapsed_ms: float = 0
|
||
|
||
|
||
def resolve_host(host: str) -> tuple[str | None, str | None]:
|
||
"""解析域名,返回 (ip, error)"""
|
||
try:
|
||
ip = socket.gethostbyname(host.strip())
|
||
return ip, None
|
||
except socket.gaierror as e:
|
||
return None, str(e)
|
||
|
||
|
||
def ping_host(host: str, timeout: int = 3) -> bool:
|
||
"""检测主机是否可达(ICMP ping)"""
|
||
try:
|
||
result = subprocess.run(
|
||
["ping", "-c", "1", "-W", str(timeout), host],
|
||
capture_output=True,
|
||
timeout=timeout + 2,
|
||
)
|
||
return result.returncode == 0
|
||
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
|
||
return False
|
||
|
||
|
||
def check_tcp(host: str, port: int, timeout: float = 5) -> tuple[PortStatus, float | None, str]:
|
||
"""TCP 端口检测,返回 (状态, 延迟ms, 详情)"""
|
||
start = time.monotonic()
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
sock.settimeout(timeout)
|
||
try:
|
||
sock.connect((host, port))
|
||
latency = (time.monotonic() - start) * 1000
|
||
return PortStatus.OPEN, round(latency, 2), f"TCP 连接成功,耗时 {latency:.0f}ms"
|
||
except ConnectionRefusedError:
|
||
return PortStatus.CLOSED, None, "TCP 连接被拒绝 (Connection Refused)"
|
||
except socket.timeout:
|
||
return PortStatus.FILTERED, None, f"TCP 连接超时 ({timeout}s)"
|
||
except OSError as e:
|
||
if e.errno in (113,): # EHOSTUNREACH
|
||
return PortStatus.FILTERED, None, f"主机不可达: {e}"
|
||
return PortStatus.UNKNOWN, None, f"TCP 检测异常: {e}"
|
||
finally:
|
||
sock.close()
|
||
|
||
|
||
def check_udp(host: str, port: int, timeout: float = 5) -> tuple[PortStatus, float | None, str]:
|
||
"""UDP 端口检测,返回 (状态, 延迟ms, 详情)"""
|
||
start = time.monotonic()
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
sock.settimeout(timeout)
|
||
try:
|
||
sock.connect((host, port))
|
||
sock.send(b"\x00")
|
||
try:
|
||
sock.recv(1024)
|
||
latency = (time.monotonic() - start) * 1000
|
||
return PortStatus.OPEN, round(latency, 2), f"UDP 收到响应,耗时 {latency:.0f}ms"
|
||
except ConnectionRefusedError:
|
||
return PortStatus.CLOSED, None, "UDP 收到 ICMP Port Unreachable,端口未开放"
|
||
except socket.timeout:
|
||
return PortStatus.UNKNOWN, None, "UDP 无响应(开放或被过滤,无法确定)"
|
||
except OSError as e:
|
||
return PortStatus.UNKNOWN, None, f"UDP 检测异常: {e}"
|
||
finally:
|
||
sock.close()
|
||
|
||
|
||
def diagnose(
|
||
port_status: PortStatus,
|
||
protocol: str,
|
||
host_reachable: bool,
|
||
dns_ok: bool,
|
||
) -> tuple[DiagnosisType, str]:
|
||
"""根据检测结果给出诊断结论"""
|
||
if not dns_ok:
|
||
return DiagnosisType.DNS_FAILED, DIAGNOSIS_MESSAGES[DiagnosisType.DNS_FAILED]
|
||
|
||
if port_status == PortStatus.OPEN:
|
||
return DiagnosisType.PORT_OPEN, DIAGNOSIS_MESSAGES[DiagnosisType.PORT_OPEN]
|
||
|
||
if port_status == PortStatus.CLOSED:
|
||
return DiagnosisType.PORT_NOT_OPEN, DIAGNOSIS_MESSAGES[DiagnosisType.PORT_NOT_OPEN]
|
||
|
||
if port_status == PortStatus.FILTERED:
|
||
if host_reachable:
|
||
return (
|
||
DiagnosisType.LIKELY_FIREWALL,
|
||
DIAGNOSIS_MESSAGES[DiagnosisType.LIKELY_FIREWALL],
|
||
)
|
||
return (
|
||
DiagnosisType.LIKELY_BLOCKED,
|
||
DIAGNOSIS_MESSAGES[DiagnosisType.LIKELY_BLOCKED],
|
||
)
|
||
|
||
if protocol == "udp" and port_status == PortStatus.UNKNOWN:
|
||
if not host_reachable:
|
||
return (
|
||
DiagnosisType.HOST_UNREACHABLE,
|
||
DIAGNOSIS_MESSAGES[DiagnosisType.HOST_UNREACHABLE],
|
||
)
|
||
return DiagnosisType.UDP_AMBIGUOUS, DIAGNOSIS_MESSAGES[DiagnosisType.UDP_AMBIGUOUS]
|
||
|
||
if not host_reachable:
|
||
return (
|
||
DiagnosisType.HOST_UNREACHABLE,
|
||
DIAGNOSIS_MESSAGES[DiagnosisType.HOST_UNREACHABLE],
|
||
)
|
||
|
||
return DiagnosisType.UDP_AMBIGUOUS, DIAGNOSIS_MESSAGES[DiagnosisType.UDP_AMBIGUOUS]
|
||
|
||
|
||
def run_check(host: str, port: int, protocol: str) -> CheckResult:
|
||
"""执行完整的端口检测流程"""
|
||
start = time.monotonic()
|
||
result = CheckResult(host=host, port=port, protocol=protocol.lower())
|
||
details: list[str] = []
|
||
|
||
# 1. DNS 解析
|
||
resolved_ip, dns_error = resolve_host(host)
|
||
if dns_error:
|
||
result.dns_ok = False
|
||
result.diagnosis = DiagnosisType.DNS_FAILED
|
||
result.diagnosis_message = DIAGNOSIS_MESSAGES[DiagnosisType.DNS_FAILED]
|
||
result.details.append(f"DNS 解析失败: {dns_error}")
|
||
result.elapsed_ms = round((time.monotonic() - start) * 1000, 2)
|
||
return result
|
||
|
||
result.dns_ok = True
|
||
result.resolved_ip = resolved_ip # type: ignore[assignment]
|
||
details.append(f"DNS 解析成功: {host} -> {resolved_ip}")
|
||
|
||
# 2. 主机可达性
|
||
host_reachable = ping_host(resolved_ip) # type: ignore[arg-type]
|
||
result.host_reachable = host_reachable
|
||
details.append(
|
||
f"主机 Ping: {'可达' if host_reachable else '不可达或无响应'}"
|
||
)
|
||
|
||
# 3. 端口检测
|
||
if protocol.lower() == "tcp":
|
||
port_status, latency, detail = check_tcp(resolved_ip, port) # type: ignore[arg-type]
|
||
else:
|
||
port_status, latency, detail = check_udp(resolved_ip, port) # type: ignore[arg-type]
|
||
|
||
result.port_status = port_status
|
||
result.latency_ms = latency
|
||
details.append(detail)
|
||
|
||
# 4. 诊断
|
||
diagnosis, message = diagnose(port_status, protocol, host_reachable, result.dns_ok)
|
||
result.diagnosis = diagnosis
|
||
result.diagnosis_message = message
|
||
result.details = details
|
||
result.elapsed_ms = round((time.monotonic() - start) * 1000, 2)
|
||
|
||
return result
|