"""端口检测与问题诊断模块""" import socket import subprocess import time from dataclasses import dataclass, field from enum import Enum class PortStatus(str, Enum): OPEN = "open" CLOSED = "closed" FILTERED = "filtered" UNKNOWN = "unknown" class DiagnosisType(str, Enum): PORT_OPEN = "port_open" PORT_NOT_OPEN = "port_not_open" LIKELY_FIREWALL = "likely_firewall" LIKELY_BLOCKED = "likely_blocked" HOST_UNREACHABLE = "host_unreachable" DNS_FAILED = "dns_failed" UDP_AMBIGUOUS = "udp_ambiguous" DIAGNOSIS_MESSAGES = { DiagnosisType.PORT_OPEN: "端口开放,服务正常可达", DiagnosisType.PORT_NOT_OPEN: "端口未开放,目标主机上该端口无服务监听", DiagnosisType.LIKELY_FIREWALL: "端口可能被防火墙拦截,主机可达但端口无响应", DiagnosisType.LIKELY_BLOCKED: "端口可能被墙或上游网络过滤,连接超时", DiagnosisType.HOST_UNREACHABLE: "目标主机不可达,请检查 IP/域名是否正确", DiagnosisType.DNS_FAILED: "域名解析失败,请检查域名是否正确", DiagnosisType.UDP_AMBIGUOUS: "UDP 端口状态不确定,可能开放或被过滤(UDP 协议特性)", } @dataclass class CheckResult: host: str port: int protocol: str resolved_ip: str = "" port_status: PortStatus = PortStatus.UNKNOWN diagnosis: DiagnosisType = DiagnosisType.UDP_AMBIGUOUS diagnosis_message: str = "" host_reachable: bool = False dns_ok: bool = False latency_ms: float | None = None details: list[str] = field(default_factory=list) elapsed_ms: float = 0 def resolve_host(host: str) -> tuple[str | None, str | None]: """解析域名,返回 (ip, error)""" try: ip = socket.gethostbyname(host.strip()) return ip, None except socket.gaierror as e: return None, str(e) def ping_host(host: str, timeout: int = 3) -> bool: """检测主机是否可达(ICMP ping)""" try: result = subprocess.run( ["ping", "-c", "1", "-W", str(timeout), host], capture_output=True, timeout=timeout + 2, ) return result.returncode == 0 except (subprocess.TimeoutExpired, FileNotFoundError, OSError): return False def check_tcp(host: str, port: int, timeout: float = 5) -> tuple[PortStatus, float | None, str]: """TCP 端口检测,返回 (状态, 延迟ms, 详情)""" start = time.monotonic() sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) try: sock.connect((host, port)) latency = (time.monotonic() - start) * 1000 return PortStatus.OPEN, round(latency, 2), f"TCP 连接成功,耗时 {latency:.0f}ms" except ConnectionRefusedError: return PortStatus.CLOSED, None, "TCP 连接被拒绝 (Connection Refused)" except socket.timeout: return PortStatus.FILTERED, None, f"TCP 连接超时 ({timeout}s)" except OSError as e: if e.errno in (113,): # EHOSTUNREACH return PortStatus.FILTERED, None, f"主机不可达: {e}" return PortStatus.UNKNOWN, None, f"TCP 检测异常: {e}" finally: sock.close() def check_udp(host: str, port: int, timeout: float = 5) -> tuple[PortStatus, float | None, str]: """UDP 端口检测,返回 (状态, 延迟ms, 详情)""" start = time.monotonic() sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(timeout) try: sock.connect((host, port)) sock.send(b"\x00") try: sock.recv(1024) latency = (time.monotonic() - start) * 1000 return PortStatus.OPEN, round(latency, 2), f"UDP 收到响应,耗时 {latency:.0f}ms" except ConnectionRefusedError: return PortStatus.CLOSED, None, "UDP 收到 ICMP Port Unreachable,端口未开放" except socket.timeout: return PortStatus.UNKNOWN, None, "UDP 无响应(开放或被过滤,无法确定)" except OSError as e: return PortStatus.UNKNOWN, None, f"UDP 检测异常: {e}" finally: sock.close() def diagnose( port_status: PortStatus, protocol: str, host_reachable: bool, dns_ok: bool, ) -> tuple[DiagnosisType, str]: """根据检测结果给出诊断结论""" if not dns_ok: return DiagnosisType.DNS_FAILED, DIAGNOSIS_MESSAGES[DiagnosisType.DNS_FAILED] if port_status == PortStatus.OPEN: return DiagnosisType.PORT_OPEN, DIAGNOSIS_MESSAGES[DiagnosisType.PORT_OPEN] if port_status == PortStatus.CLOSED: return DiagnosisType.PORT_NOT_OPEN, DIAGNOSIS_MESSAGES[DiagnosisType.PORT_NOT_OPEN] if port_status == PortStatus.FILTERED: if host_reachable: return ( DiagnosisType.LIKELY_FIREWALL, DIAGNOSIS_MESSAGES[DiagnosisType.LIKELY_FIREWALL], ) return ( DiagnosisType.LIKELY_BLOCKED, DIAGNOSIS_MESSAGES[DiagnosisType.LIKELY_BLOCKED], ) if protocol == "udp" and port_status == PortStatus.UNKNOWN: if not host_reachable: return ( DiagnosisType.HOST_UNREACHABLE, DIAGNOSIS_MESSAGES[DiagnosisType.HOST_UNREACHABLE], ) return DiagnosisType.UDP_AMBIGUOUS, DIAGNOSIS_MESSAGES[DiagnosisType.UDP_AMBIGUOUS] if not host_reachable: return ( DiagnosisType.HOST_UNREACHABLE, DIAGNOSIS_MESSAGES[DiagnosisType.HOST_UNREACHABLE], ) return DiagnosisType.UDP_AMBIGUOUS, DIAGNOSIS_MESSAGES[DiagnosisType.UDP_AMBIGUOUS] def run_check(host: str, port: int, protocol: str) -> CheckResult: """执行完整的端口检测流程""" start = time.monotonic() result = CheckResult(host=host, port=port, protocol=protocol.lower()) details: list[str] = [] # 1. DNS 解析 resolved_ip, dns_error = resolve_host(host) if dns_error: result.dns_ok = False result.diagnosis = DiagnosisType.DNS_FAILED result.diagnosis_message = DIAGNOSIS_MESSAGES[DiagnosisType.DNS_FAILED] result.details.append(f"DNS 解析失败: {dns_error}") result.elapsed_ms = round((time.monotonic() - start) * 1000, 2) return result result.dns_ok = True result.resolved_ip = resolved_ip # type: ignore[assignment] details.append(f"DNS 解析成功: {host} -> {resolved_ip}") # 2. 主机可达性 host_reachable = ping_host(resolved_ip) # type: ignore[arg-type] result.host_reachable = host_reachable details.append( f"主机 Ping: {'可达' if host_reachable else '不可达或无响应'}" ) # 3. 端口检测 if protocol.lower() == "tcp": port_status, latency, detail = check_tcp(resolved_ip, port) # type: ignore[arg-type] else: port_status, latency, detail = check_udp(resolved_ip, port) # type: ignore[arg-type] result.port_status = port_status result.latency_ms = latency details.append(detail) # 4. 诊断 diagnosis, message = diagnose(port_status, protocol, host_reachable, result.dns_ok) result.diagnosis = diagnosis result.diagnosis_message = message result.details = details result.elapsed_ms = round((time.monotonic() - start) * 1000, 2) return result