Files
cpcheck/app/detector.py
T
dekun 32c8f4b156 Initial release: CPCHECK cloud port detection tool
Web-based TCP/UDP port checker with firewall/GFW diagnosis, PM2 deployment config, and Ubuntu one-click install script.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-16 15:24:40 +08:00

208 lines
7.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""端口检测与问题诊断模块"""
import socket
import subprocess
import time
from dataclasses import dataclass, field
from enum import Enum
class PortStatus(str, Enum):
OPEN = "open"
CLOSED = "closed"
FILTERED = "filtered"
UNKNOWN = "unknown"
class DiagnosisType(str, Enum):
PORT_OPEN = "port_open"
PORT_NOT_OPEN = "port_not_open"
LIKELY_FIREWALL = "likely_firewall"
LIKELY_BLOCKED = "likely_blocked"
HOST_UNREACHABLE = "host_unreachable"
DNS_FAILED = "dns_failed"
UDP_AMBIGUOUS = "udp_ambiguous"
DIAGNOSIS_MESSAGES = {
DiagnosisType.PORT_OPEN: "端口开放,服务正常可达",
DiagnosisType.PORT_NOT_OPEN: "端口未开放,目标主机上该端口无服务监听",
DiagnosisType.LIKELY_FIREWALL: "端口可能被防火墙拦截,主机可达但端口无响应",
DiagnosisType.LIKELY_BLOCKED: "端口可能被墙或上游网络过滤,连接超时",
DiagnosisType.HOST_UNREACHABLE: "目标主机不可达,请检查 IP/域名是否正确",
DiagnosisType.DNS_FAILED: "域名解析失败,请检查域名是否正确",
DiagnosisType.UDP_AMBIGUOUS: "UDP 端口状态不确定,可能开放或被过滤(UDP 协议特性)",
}
@dataclass
class CheckResult:
host: str
port: int
protocol: str
resolved_ip: str = ""
port_status: PortStatus = PortStatus.UNKNOWN
diagnosis: DiagnosisType = DiagnosisType.UDP_AMBIGUOUS
diagnosis_message: str = ""
host_reachable: bool = False
dns_ok: bool = False
latency_ms: float | None = None
details: list[str] = field(default_factory=list)
elapsed_ms: float = 0
def resolve_host(host: str) -> tuple[str | None, str | None]:
"""解析域名,返回 (ip, error)"""
try:
ip = socket.gethostbyname(host.strip())
return ip, None
except socket.gaierror as e:
return None, str(e)
def ping_host(host: str, timeout: int = 3) -> bool:
"""检测主机是否可达(ICMP ping"""
try:
result = subprocess.run(
["ping", "-c", "1", "-W", str(timeout), host],
capture_output=True,
timeout=timeout + 2,
)
return result.returncode == 0
except (subprocess.TimeoutExpired, FileNotFoundError, OSError):
return False
def check_tcp(host: str, port: int, timeout: float = 5) -> tuple[PortStatus, float | None, str]:
"""TCP 端口检测,返回 (状态, 延迟ms, 详情)"""
start = time.monotonic()
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
try:
sock.connect((host, port))
latency = (time.monotonic() - start) * 1000
return PortStatus.OPEN, round(latency, 2), f"TCP 连接成功,耗时 {latency:.0f}ms"
except ConnectionRefusedError:
return PortStatus.CLOSED, None, "TCP 连接被拒绝 (Connection Refused)"
except socket.timeout:
return PortStatus.FILTERED, None, f"TCP 连接超时 ({timeout}s)"
except OSError as e:
if e.errno in (113,): # EHOSTUNREACH
return PortStatus.FILTERED, None, f"主机不可达: {e}"
return PortStatus.UNKNOWN, None, f"TCP 检测异常: {e}"
finally:
sock.close()
def check_udp(host: str, port: int, timeout: float = 5) -> tuple[PortStatus, float | None, str]:
"""UDP 端口检测,返回 (状态, 延迟ms, 详情)"""
start = time.monotonic()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(timeout)
try:
sock.connect((host, port))
sock.send(b"\x00")
try:
sock.recv(1024)
latency = (time.monotonic() - start) * 1000
return PortStatus.OPEN, round(latency, 2), f"UDP 收到响应,耗时 {latency:.0f}ms"
except ConnectionRefusedError:
return PortStatus.CLOSED, None, "UDP 收到 ICMP Port Unreachable,端口未开放"
except socket.timeout:
return PortStatus.UNKNOWN, None, "UDP 无响应(开放或被过滤,无法确定)"
except OSError as e:
return PortStatus.UNKNOWN, None, f"UDP 检测异常: {e}"
finally:
sock.close()
def diagnose(
port_status: PortStatus,
protocol: str,
host_reachable: bool,
dns_ok: bool,
) -> tuple[DiagnosisType, str]:
"""根据检测结果给出诊断结论"""
if not dns_ok:
return DiagnosisType.DNS_FAILED, DIAGNOSIS_MESSAGES[DiagnosisType.DNS_FAILED]
if port_status == PortStatus.OPEN:
return DiagnosisType.PORT_OPEN, DIAGNOSIS_MESSAGES[DiagnosisType.PORT_OPEN]
if port_status == PortStatus.CLOSED:
return DiagnosisType.PORT_NOT_OPEN, DIAGNOSIS_MESSAGES[DiagnosisType.PORT_NOT_OPEN]
if port_status == PortStatus.FILTERED:
if host_reachable:
return (
DiagnosisType.LIKELY_FIREWALL,
DIAGNOSIS_MESSAGES[DiagnosisType.LIKELY_FIREWALL],
)
return (
DiagnosisType.LIKELY_BLOCKED,
DIAGNOSIS_MESSAGES[DiagnosisType.LIKELY_BLOCKED],
)
if protocol == "udp" and port_status == PortStatus.UNKNOWN:
if not host_reachable:
return (
DiagnosisType.HOST_UNREACHABLE,
DIAGNOSIS_MESSAGES[DiagnosisType.HOST_UNREACHABLE],
)
return DiagnosisType.UDP_AMBIGUOUS, DIAGNOSIS_MESSAGES[DiagnosisType.UDP_AMBIGUOUS]
if not host_reachable:
return (
DiagnosisType.HOST_UNREACHABLE,
DIAGNOSIS_MESSAGES[DiagnosisType.HOST_UNREACHABLE],
)
return DiagnosisType.UDP_AMBIGUOUS, DIAGNOSIS_MESSAGES[DiagnosisType.UDP_AMBIGUOUS]
def run_check(host: str, port: int, protocol: str) -> CheckResult:
"""执行完整的端口检测流程"""
start = time.monotonic()
result = CheckResult(host=host, port=port, protocol=protocol.lower())
details: list[str] = []
# 1. DNS 解析
resolved_ip, dns_error = resolve_host(host)
if dns_error:
result.dns_ok = False
result.diagnosis = DiagnosisType.DNS_FAILED
result.diagnosis_message = DIAGNOSIS_MESSAGES[DiagnosisType.DNS_FAILED]
result.details.append(f"DNS 解析失败: {dns_error}")
result.elapsed_ms = round((time.monotonic() - start) * 1000, 2)
return result
result.dns_ok = True
result.resolved_ip = resolved_ip # type: ignore[assignment]
details.append(f"DNS 解析成功: {host} -> {resolved_ip}")
# 2. 主机可达性
host_reachable = ping_host(resolved_ip) # type: ignore[arg-type]
result.host_reachable = host_reachable
details.append(
f"主机 Ping: {'可达' if host_reachable else '不可达或无响应'}"
)
# 3. 端口检测
if protocol.lower() == "tcp":
port_status, latency, detail = check_tcp(resolved_ip, port) # type: ignore[arg-type]
else:
port_status, latency, detail = check_udp(resolved_ip, port) # type: ignore[arg-type]
result.port_status = port_status
result.latency_ms = latency
details.append(detail)
# 4. 诊断
diagnosis, message = diagnose(port_status, protocol, host_reachable, result.dns_ok)
result.diagnosis = diagnosis
result.diagnosis_message = message
result.details = details
result.elapsed_ms = round((time.monotonic() - start) * 1000, 2)
return result