commit 32c8f4b156d8a5f654c9954a6bfdc81df9663cf4 Author: dekun Date: Tue Jun 16 15:24:40 2026 +0800 Initial release: CPCHECK cloud port detection tool Web-based TCP/UDP port checker with firewall/GFW diagnosis, PM2 deployment config, and Ubuntu one-click install script. Co-authored-by: Cursor diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..a1ddf26 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,8 @@ +* text=auto +*.sh text eol=lf +*.py text eol=lf +*.js text eol=lf +*.css text eol=lf +*.html text eol=lf +*.md text eol=lf +*.json text eol=lf diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..acf4721 --- /dev/null +++ b/.gitignore @@ -0,0 +1,13 @@ +venv/ +__pycache__/ +*.py[cod] +*$py.class +*.egg-info/ +dist/ +build/ +.env +logs/ +*.log +.DS_Store +.idea/ +.vscode/ diff --git a/DEPLOY.md b/DEPLOY.md new file mode 100644 index 0000000..ee66b3f --- /dev/null +++ b/DEPLOY.md @@ -0,0 +1,250 @@ +# CPCHECK 部署文档 + +本文档详细说明如何在 Ubuntu 服务器上部署 CPCHECK 端口检测工具。 + +## 环境要求 + +| 项目 | 要求 | +|------|------| +| 操作系统 | Ubuntu 20.04 / 22.04 / 24.04 | +| 用户权限 | root | +| Python | >= 3.10 | +| 安装路径 | /opt/cpcheck | +| 服务端口 | 5230 | +| 进程管理 | PM2 | + +> **注意**: 本工具仅支持 Ubuntu Linux,不支持 Windows 及其他发行版。 + +## 一键部署 + +### 1. 获取代码 + +```bash +git clone https://git.bz121.com/dekun/cpcheck.git +cd cpcheck +``` + +### 2. 执行部署脚本 + +```bash +sudo bash deploy/install.sh +``` + +部署脚本会自动完成以下操作: + +1. 检测 Ubuntu 系统和 root 权限 +2. 安装系统依赖(python3、ping、curl 等) +3. 安装 Node.js 和 PM2(如未安装) +4. 复制代码到 `/opt/cpcheck` +5. 创建 Python 虚拟环境并安装依赖 +6. 通过 PM2 启动服务 +7. 配置 PM2 开机自启 +8. 放行防火墙端口 5230(如 UFW 已启用) +9. 执行健康检查 + +### 3. 验证部署 + +```bash +# 检查 PM2 服务状态 +pm2 status + +# 健康检查 +curl http://127.0.0.1:5230/api/health + +# 测试端口检测 +curl -X POST http://127.0.0.1:5230/api/check \ + -H "Content-Type: application/json" \ + -d '{"host":"example.com","port":80,"protocol":"tcp"}' +``` + +### 4. 访问 Web 界面 + +浏览器打开: `http://<服务器公网IP>:5230` + +## 手动部署 + +如需手动部署,按以下步骤操作: + +```bash +# 1. 安装依赖 +apt-get update +apt-get install -y python3 python3-venv python3-pip iputils-ping curl git + +# 2. 安装 Node.js 和 PM2 +curl -fsSL https://deb.nodesource.com/setup_20.x | bash - +apt-get install -y nodejs +npm install -g pm2 + +# 3. 部署代码 +mkdir -p /opt/cpcheck +cp -r . /opt/cpcheck/ +cd /opt/cpcheck + +# 4. 创建虚拟环境 +python3 -m venv venv +venv/bin/pip install -r requirements.txt + +# 5. 创建日志目录 +mkdir -p logs + +# 6. 启动服务 +pm2 start ecosystem.config.js +pm2 save +pm2 startup systemd -u root --hp /root +``` + +## 更新部署 + +```bash +cd /path/to/cpcheck +git pull +sudo bash deploy/install.sh +``` + +部署脚本会自动重启服务。 + +## 防火墙配置 + +### UFW + +```bash +ufw allow 5230/tcp +ufw reload +``` + +### iptables + +```bash +iptables -A INPUT -p tcp --dport 5230 -j ACCEPT +``` + +### 云服务商安全组 + +在云服务商控制台的安全组/防火墙规则中,放行入站 TCP 5230 端口。 + +## 目录结构(部署后) + +``` +/opt/cpcheck/ +├── app/ # 后端代码 +├── static/ # 前端静态文件 +├── venv/ # Python 虚拟环境 +├── logs/ # PM2 日志 +│ ├── error.log +│ └── out.log +├── ecosystem.config.js # PM2 配置 +└── requirements.txt +``` + +## 运维管理 + +### 常用 PM2 命令 + +```bash +pm2 status # 查看所有进程状态 +pm2 logs cpcheck # 实时查看日志 +pm2 logs cpcheck --lines 100 # 查看最近 100 行 +pm2 restart cpcheck # 重启服务 +pm2 stop cpcheck # 停止服务 +pm2 delete cpcheck # 删除进程 +pm2 monit # 监控面板 +``` + +### 查看日志 + +```bash +# PM2 日志 +pm2 logs cpcheck + +# 日志文件 +tail -f /opt/cpcheck/logs/out.log +tail -f /opt/cpcheck/logs/error.log +``` + +### 修改配置 + +编辑 `/opt/cpcheck/app/config.py` 可修改超时时间等参数: + +```python +CONNECT_TIMEOUT = 5 # TCP 连接超时(秒) +UDP_TIMEOUT = 5 # UDP 检测超时(秒) +PING_TIMEOUT = 3 # Ping 超时(秒) +``` + +修改后重启: `pm2 restart cpcheck` + +## 故障排查 + +### 服务无法启动 + +```bash +# 查看错误日志 +pm2 logs cpcheck --err + +# 手动测试启动 +cd /opt/cpcheck +venv/bin/uvicorn app.main:app --host 0.0.0.0 --port 5230 +``` + +### 端口被占用 + +```bash +# 查看 5230 端口占用 +ss -tlnp | grep 5230 + +# 或 +lsof -i :5230 +``` + +### 检测功能异常 + +```bash +# 确认 ping 命令可用 +ping -c 1 8.8.8.8 + +# 确认 DNS 解析正常 +nslookup example.com + +# 手动测试 TCP 连接 +nc -zv example.com 80 +``` + +### PM2 开机不自启 + +```bash +pm2 save +pm2 startup systemd -u root --hp /root +# 执行输出的命令 +``` + +## 卸载 + +```bash +pm2 stop cpcheck +pm2 delete cpcheck +pm2 save +rm -rf /opt/cpcheck +``` + +## 安全建议 + +1. 建议通过 Nginx 反向代理并配置 HTTPS +2. 限制访问 IP 或使用认证(如需公网暴露) +3. 定期更新系统和依赖包 +4. 监控服务器资源使用情况 + +## Nginx 反向代理(可选) + +```nginx +server { + listen 80; + server_name check.example.com; + + location / { + proxy_pass http://127.0.0.1:5230; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + } +} +``` diff --git a/README.md b/README.md new file mode 100644 index 0000000..753a782 --- /dev/null +++ b/README.md @@ -0,0 +1,145 @@ +# CPCHECK - 云服务器端口检测工具 + +CPCHECK 是一款面向云服务器的端口检测工具,提供 Web 界面,支持 TCP/UDP 端口检测,并能快速定位端口问题是「未开放」「防火墙拦截」还是「被墙/网络过滤」。 + +## 功能特性 + +- **TCP/UDP 端口检测** — 输入域名或 IP + 端口,一键检测 +- **智能问题诊断** — 自动分析并给出结论: + - 端口开放,服务正常可达 + - 端口未开放(目标无服务监听) + - 可能被防火墙拦截(主机可达但端口超时) + - 可能被墙或上游网络过滤(主机不可达 + 端口超时) + - DNS 解析失败 + - UDP 状态不确定(协议特性) +- **Web 界面** — 简洁现代的前端,实时展示检测结果与详情 +- **一键部署** — Ubuntu 下 root 用户运行部署脚本即可完成安装 + +## 系统要求 + +- **操作系统**: Ubuntu(仅支持 Linux,不支持 Windows) +- **Python**: >= 3.10 +- **权限**: root 用户 +- **端口**: 5230(Web 服务) + +## 快速开始 + +```bash +# 克隆仓库 +git clone https://git.bz121.com/dekun/cpcheck.git +cd cpcheck + +# 一键部署(需 root) +sudo bash deploy/install.sh +``` + +部署完成后访问: `http://<服务器IP>:5230` + +详细部署说明请参阅 [DEPLOY.md](DEPLOY.md)。 + +## 使用方法 + +1. 打开浏览器访问 `http://<服务器IP>:5230` +2. 输入目标域名或 IP 地址 +3. 输入端口号(1-65535) +4. 选择协议(TCP 或 UDP) +5. 点击「开始检测」,查看诊断结论 + +## 诊断逻辑说明 + +| 检测现象 | 诊断结论 | +|---------|---------| +| TCP 连接成功 | 端口开放 | +| TCP Connection Refused | 端口未开放 | +| TCP 超时 + 主机 Ping 可达 | 可能被防火墙拦截 | +| TCP 超时 + 主机 Ping 不可达 | 可能被墙或网络过滤 | +| DNS 解析失败 | 域名问题 | +| UDP 无响应 | 状态不确定(开放或被过滤) | +| UDP ICMP Port Unreachable | 端口未开放 | + +## API 接口 + +### 健康检查 + +``` +GET /api/health +``` + +### 端口检测 + +``` +POST /api/check +Content-Type: application/json + +{ + "host": "example.com", + "port": 443, + "protocol": "tcp" +} +``` + +响应示例: + +```json +{ + "success": true, + "data": { + "host": "example.com", + "port": 443, + "protocol": "tcp", + "resolved_ip": "93.184.216.34", + "port_status": "open", + "diagnosis": "port_open", + "diagnosis_message": "端口开放,服务正常可达", + "host_reachable": true, + "dns_ok": true, + "latency_ms": 45.2, + "details": ["DNS 解析成功: example.com -> 93.184.216.34", "..."], + "elapsed_ms": 1200.5 + } +} +``` + +## 项目结构 + +``` +cpcheck/ +├── app/ +│ ├── main.py # FastAPI 应用入口 +│ ├── detector.py # 端口检测与诊断逻辑 +│ └── config.py # 配置 +├── static/ +│ ├── index.html # Web 前端页面 +│ ├── css/style.css +│ └── js/app.js +├── deploy/ +│ └── install.sh # 一键部署脚本 +├── tests/ +│ └── test_detector.py # 单元测试 +├── ecosystem.config.js # PM2 配置 +├── requirements.txt +├── README.md +└── DEPLOY.md +``` + +## 运维命令 + +```bash +pm2 status # 查看服务状态 +pm2 logs cpcheck # 查看日志 +pm2 restart cpcheck # 重启服务 +pm2 stop cpcheck # 停止服务 +``` + +## 运行测试 + +```bash +cd /opt/cpcheck +venv/bin/python -m pytest tests/ -v +# 或使用 unittest +venv/bin/python -m unittest discover -s tests -v +``` + +## 许可证 + +MIT License diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..d180431 --- /dev/null +++ b/app/__init__.py @@ -0,0 +1 @@ +"""CPCHECK 应用包""" diff --git a/app/config.py b/app/config.py new file mode 100644 index 0000000..7aea409 --- /dev/null +++ b/app/config.py @@ -0,0 +1,7 @@ +"""应用配置""" + +HOST = "0.0.0.0" +PORT = 5230 +CONNECT_TIMEOUT = 5 +UDP_TIMEOUT = 5 +PING_TIMEOUT = 3 diff --git a/app/detector.py b/app/detector.py new file mode 100644 index 0000000..9ef5f29 --- /dev/null +++ b/app/detector.py @@ -0,0 +1,207 @@ +"""端口检测与问题诊断模块""" + +import socket +import subprocess +import time +from dataclasses import dataclass, field +from enum import Enum + + +class PortStatus(str, Enum): + OPEN = "open" + CLOSED = "closed" + FILTERED = "filtered" + UNKNOWN = "unknown" + + +class DiagnosisType(str, Enum): + PORT_OPEN = "port_open" + PORT_NOT_OPEN = "port_not_open" + LIKELY_FIREWALL = "likely_firewall" + LIKELY_BLOCKED = "likely_blocked" + HOST_UNREACHABLE = "host_unreachable" + DNS_FAILED = "dns_failed" + UDP_AMBIGUOUS = "udp_ambiguous" + + +DIAGNOSIS_MESSAGES = { + DiagnosisType.PORT_OPEN: "端口开放,服务正常可达", + DiagnosisType.PORT_NOT_OPEN: "端口未开放,目标主机上该端口无服务监听", + DiagnosisType.LIKELY_FIREWALL: "端口可能被防火墙拦截,主机可达但端口无响应", + DiagnosisType.LIKELY_BLOCKED: "端口可能被墙或上游网络过滤,连接超时", + DiagnosisType.HOST_UNREACHABLE: "目标主机不可达,请检查 IP/域名是否正确", + DiagnosisType.DNS_FAILED: "域名解析失败,请检查域名是否正确", + DiagnosisType.UDP_AMBIGUOUS: "UDP 端口状态不确定,可能开放或被过滤(UDP 协议特性)", +} + + +@dataclass +class CheckResult: + host: str + port: int + protocol: str + resolved_ip: str = "" + port_status: PortStatus = PortStatus.UNKNOWN + diagnosis: DiagnosisType = DiagnosisType.UDP_AMBIGUOUS + diagnosis_message: str = "" + host_reachable: bool = False + dns_ok: bool = False + latency_ms: float | None = None + details: list[str] = field(default_factory=list) + elapsed_ms: float = 0 + + +def resolve_host(host: str) -> tuple[str | None, str | None]: + """解析域名,返回 (ip, error)""" + try: + ip = socket.gethostbyname(host.strip()) + return ip, None + except socket.gaierror as e: + return None, str(e) + + +def ping_host(host: str, timeout: int = 3) -> bool: + """检测主机是否可达(ICMP ping)""" + try: + result = subprocess.run( + ["ping", "-c", "1", "-W", str(timeout), host], + capture_output=True, + timeout=timeout + 2, + ) + return result.returncode == 0 + except (subprocess.TimeoutExpired, FileNotFoundError, OSError): + return False + + +def check_tcp(host: str, port: int, timeout: float = 5) -> tuple[PortStatus, float | None, str]: + """TCP 端口检测,返回 (状态, 延迟ms, 详情)""" + start = time.monotonic() + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(timeout) + try: + sock.connect((host, port)) + latency = (time.monotonic() - start) * 1000 + return PortStatus.OPEN, round(latency, 2), f"TCP 连接成功,耗时 {latency:.0f}ms" + except ConnectionRefusedError: + return PortStatus.CLOSED, None, "TCP 连接被拒绝 (Connection Refused)" + except socket.timeout: + return PortStatus.FILTERED, None, f"TCP 连接超时 ({timeout}s)" + except OSError as e: + if e.errno in (113,): # EHOSTUNREACH + return PortStatus.FILTERED, None, f"主机不可达: {e}" + return PortStatus.UNKNOWN, None, f"TCP 检测异常: {e}" + finally: + sock.close() + + +def check_udp(host: str, port: int, timeout: float = 5) -> tuple[PortStatus, float | None, str]: + """UDP 端口检测,返回 (状态, 延迟ms, 详情)""" + start = time.monotonic() + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + sock.settimeout(timeout) + try: + sock.connect((host, port)) + sock.send(b"\x00") + try: + sock.recv(1024) + latency = (time.monotonic() - start) * 1000 + return PortStatus.OPEN, round(latency, 2), f"UDP 收到响应,耗时 {latency:.0f}ms" + except ConnectionRefusedError: + return PortStatus.CLOSED, None, "UDP 收到 ICMP Port Unreachable,端口未开放" + except socket.timeout: + return PortStatus.UNKNOWN, None, "UDP 无响应(开放或被过滤,无法确定)" + except OSError as e: + return PortStatus.UNKNOWN, None, f"UDP 检测异常: {e}" + finally: + sock.close() + + +def diagnose( + port_status: PortStatus, + protocol: str, + host_reachable: bool, + dns_ok: bool, +) -> tuple[DiagnosisType, str]: + """根据检测结果给出诊断结论""" + if not dns_ok: + return DiagnosisType.DNS_FAILED, DIAGNOSIS_MESSAGES[DiagnosisType.DNS_FAILED] + + if port_status == PortStatus.OPEN: + return DiagnosisType.PORT_OPEN, DIAGNOSIS_MESSAGES[DiagnosisType.PORT_OPEN] + + if port_status == PortStatus.CLOSED: + return DiagnosisType.PORT_NOT_OPEN, DIAGNOSIS_MESSAGES[DiagnosisType.PORT_NOT_OPEN] + + if port_status == PortStatus.FILTERED: + if host_reachable: + return ( + DiagnosisType.LIKELY_FIREWALL, + DIAGNOSIS_MESSAGES[DiagnosisType.LIKELY_FIREWALL], + ) + return ( + DiagnosisType.LIKELY_BLOCKED, + DIAGNOSIS_MESSAGES[DiagnosisType.LIKELY_BLOCKED], + ) + + if protocol == "udp" and port_status == PortStatus.UNKNOWN: + if not host_reachable: + return ( + DiagnosisType.HOST_UNREACHABLE, + DIAGNOSIS_MESSAGES[DiagnosisType.HOST_UNREACHABLE], + ) + return DiagnosisType.UDP_AMBIGUOUS, DIAGNOSIS_MESSAGES[DiagnosisType.UDP_AMBIGUOUS] + + if not host_reachable: + return ( + DiagnosisType.HOST_UNREACHABLE, + DIAGNOSIS_MESSAGES[DiagnosisType.HOST_UNREACHABLE], + ) + + return DiagnosisType.UDP_AMBIGUOUS, DIAGNOSIS_MESSAGES[DiagnosisType.UDP_AMBIGUOUS] + + +def run_check(host: str, port: int, protocol: str) -> CheckResult: + """执行完整的端口检测流程""" + start = time.monotonic() + result = CheckResult(host=host, port=port, protocol=protocol.lower()) + details: list[str] = [] + + # 1. DNS 解析 + resolved_ip, dns_error = resolve_host(host) + if dns_error: + result.dns_ok = False + result.diagnosis = DiagnosisType.DNS_FAILED + result.diagnosis_message = DIAGNOSIS_MESSAGES[DiagnosisType.DNS_FAILED] + result.details.append(f"DNS 解析失败: {dns_error}") + result.elapsed_ms = round((time.monotonic() - start) * 1000, 2) + return result + + result.dns_ok = True + result.resolved_ip = resolved_ip # type: ignore[assignment] + details.append(f"DNS 解析成功: {host} -> {resolved_ip}") + + # 2. 主机可达性 + host_reachable = ping_host(resolved_ip) # type: ignore[arg-type] + result.host_reachable = host_reachable + details.append( + f"主机 Ping: {'可达' if host_reachable else '不可达或无响应'}" + ) + + # 3. 端口检测 + if protocol.lower() == "tcp": + port_status, latency, detail = check_tcp(resolved_ip, port) # type: ignore[arg-type] + else: + port_status, latency, detail = check_udp(resolved_ip, port) # type: ignore[arg-type] + + result.port_status = port_status + result.latency_ms = latency + details.append(detail) + + # 4. 诊断 + diagnosis, message = diagnose(port_status, protocol, host_reachable, result.dns_ok) + result.diagnosis = diagnosis + result.diagnosis_message = message + result.details = details + result.elapsed_ms = round((time.monotonic() - start) * 1000, 2) + + return result diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..1b4fe21 --- /dev/null +++ b/app/main.py @@ -0,0 +1,101 @@ +"""CPCHECK - 云服务器端口检测工具 API""" + +import os +from pathlib import Path + +from fastapi import FastAPI, HTTPException +from fastapi.responses import FileResponse +from fastapi.staticfiles import StaticFiles +from pydantic import BaseModel, Field, field_validator + +from app.config import HOST, PORT +from app.detector import CheckResult, run_check + +BASE_DIR = Path(__file__).resolve().parent.parent +STATIC_DIR = BASE_DIR / "static" + +app = FastAPI( + title="CPCHECK", + description="云服务器端口检测工具 - 支持 TCP/UDP 端口检测与问题诊断", + version="1.0.0", +) + +if STATIC_DIR.exists(): + app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static") + + +class CheckRequest(BaseModel): + host: str = Field(..., min_length=1, max_length=255, description="域名或 IP 地址") + port: int = Field(..., ge=1, le=65535, description="端口号") + protocol: str = Field(default="tcp", description="协议类型: tcp 或 udp") + + @field_validator("protocol") + @classmethod + def validate_protocol(cls, v: str) -> str: + v = v.lower().strip() + if v not in ("tcp", "udp"): + raise ValueError("协议必须是 tcp 或 udp") + return v + + @field_validator("host") + @classmethod + def validate_host(cls, v: str) -> str: + v = v.strip() + if not v: + raise ValueError("主机地址不能为空") + return v + + +class CheckResponse(BaseModel): + success: bool + data: dict | None = None + error: str | None = None + + +def result_to_dict(result: CheckResult) -> dict: + return { + "host": result.host, + "port": result.port, + "protocol": result.protocol, + "resolved_ip": result.resolved_ip, + "port_status": result.port_status.value, + "diagnosis": result.diagnosis.value, + "diagnosis_message": result.diagnosis_message, + "host_reachable": result.host_reachable, + "dns_ok": result.dns_ok, + "latency_ms": result.latency_ms, + "details": result.details, + "elapsed_ms": result.elapsed_ms, + } + + +@app.get("/") +async def index(): + index_file = STATIC_DIR / "index.html" + if index_file.exists(): + return FileResponse(str(index_file)) + return {"message": "CPCHECK API is running. Visit /docs for API documentation."} + + +@app.get("/api/health") +async def health(): + return {"status": "ok", "service": "cpcheck"} + + +@app.post("/api/check", response_model=CheckResponse) +async def check_port(req: CheckRequest): + try: + result = run_check(req.host, req.port, req.protocol) + return CheckResponse(success=True, data=result_to_dict(result)) + except Exception as e: + raise HTTPException(status_code=500, detail=str(e)) + + +def start(): + import uvicorn + + uvicorn.run("app.main:app", host=HOST, port=PORT, reload=False) + + +if __name__ == "__main__": + start() diff --git a/deploy/install.sh b/deploy/install.sh new file mode 100644 index 0000000..e69de29 diff --git a/ecosystem.config.js b/ecosystem.config.js new file mode 100644 index 0000000..a8aae10 --- /dev/null +++ b/ecosystem.config.js @@ -0,0 +1,21 @@ +module.exports = { + apps: [ + { + name: "cpcheck", + script: "venv/bin/uvicorn", + args: "app.main:app --host 0.0.0.0 --port 5230", + cwd: "/opt/cpcheck", + interpreter: "none", + instances: 1, + autorestart: true, + watch: false, + max_memory_restart: "200M", + env: { + NODE_ENV: "production", + }, + error_file: "/opt/cpcheck/logs/error.log", + out_file: "/opt/cpcheck/logs/out.log", + log_date_format: "YYYY-MM-DD HH:mm:ss", + }, + ], +}; diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0fb8d83 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +fastapi==0.115.6 +uvicorn[standard]==0.34.0 +pydantic==2.10.4 diff --git a/static/css/style.css b/static/css/style.css new file mode 100644 index 0000000..af00343 --- /dev/null +++ b/static/css/style.css @@ -0,0 +1,278 @@ +:root { + --bg: #0f1419; + --surface: #1a2332; + --surface-hover: #243044; + --border: #2d3a4f; + --text: #e7ecf3; + --text-muted: #8b9cb3; + --primary: #3b82f6; + --primary-hover: #2563eb; + --success: #22c55e; + --warning: #f59e0b; + --danger: #ef4444; + --info: #06b6d4; + --radius: 10px; +} + +* { + margin: 0; + padding: 0; + box-sizing: border-box; +} + +body { + font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, sans-serif; + background: var(--bg); + color: var(--text); + min-height: 100vh; + line-height: 1.6; +} + +.container { + max-width: 800px; + margin: 0 auto; + padding: 2rem 1.5rem; + min-height: 100vh; + display: flex; + flex-direction: column; +} + +header { + text-align: center; + margin-bottom: 2.5rem; +} + +header h1 { + font-size: 2rem; + font-weight: 700; + letter-spacing: 0.05em; + background: linear-gradient(135deg, var(--primary), var(--info)); + -webkit-background-clip: text; + -webkit-text-fill-color: transparent; + background-clip: text; +} + +.subtitle { + color: var(--text-muted); + margin-top: 0.5rem; + font-size: 0.95rem; +} + +.check-form { + background: var(--surface); + border: 1px solid var(--border); + border-radius: var(--radius); + padding: 1.5rem; + margin-bottom: 1.5rem; +} + +.form-row { + display: flex; + gap: 1rem; + margin-bottom: 1.25rem; + flex-wrap: wrap; +} + +.form-group { + display: flex; + flex-direction: column; + gap: 0.4rem; + flex: 1; + min-width: 120px; +} + +.form-group.flex-2 { + flex: 2; + min-width: 200px; +} + +.form-group label { + font-size: 0.85rem; + color: var(--text-muted); + font-weight: 500; +} + +.form-group input, +.form-group select { + background: var(--bg); + border: 1px solid var(--border); + border-radius: 6px; + padding: 0.65rem 0.85rem; + color: var(--text); + font-size: 0.95rem; + transition: border-color 0.2s; +} + +.form-group input:focus, +.form-group select:focus { + outline: none; + border-color: var(--primary); +} + +.btn-primary { + width: 100%; + background: var(--primary); + color: #fff; + border: none; + border-radius: 6px; + padding: 0.75rem 1.5rem; + font-size: 1rem; + font-weight: 600; + cursor: pointer; + transition: background 0.2s; +} + +.btn-primary:hover:not(:disabled) { + background: var(--primary-hover); +} + +.btn-primary:disabled { + opacity: 0.6; + cursor: not-allowed; +} + +.error-box { + background: rgba(239, 68, 68, 0.1); + border: 1px solid var(--danger); + border-radius: var(--radius); + padding: 1rem 1.25rem; + color: var(--danger); + margin-bottom: 1.5rem; +} + +.result-box { + background: var(--surface); + border: 1px solid var(--border); + border-radius: var(--radius); + padding: 1.5rem; + animation: fadeIn 0.3s ease; +} + +@keyframes fadeIn { + from { opacity: 0; transform: translateY(8px); } + to { opacity: 1; transform: translateY(0); } +} + +.result-header { + display: flex; + justify-content: space-between; + align-items: center; + margin-bottom: 1.25rem; +} + +.result-header h2 { + font-size: 1.1rem; + font-weight: 600; +} + +.elapsed { + font-size: 0.85rem; + color: var(--text-muted); +} + +.diagnosis-card { + display: flex; + align-items: flex-start; + gap: 1rem; + padding: 1.25rem; + border-radius: var(--radius); + margin-bottom: 1.5rem; + border: 1px solid var(--border); +} + +.diagnosis-card.success { background: rgba(34, 197, 94, 0.08); border-color: rgba(34, 197, 94, 0.3); } +.diagnosis-card.warning { background: rgba(245, 158, 11, 0.08); border-color: rgba(245, 158, 11, 0.3); } +.diagnosis-card.danger { background: rgba(239, 68, 68, 0.08); border-color: rgba(239, 68, 68, 0.3); } +.diagnosis-card.info { background: rgba(6, 182, 212, 0.08); border-color: rgba(6, 182, 212, 0.3); } + +.diagnosis-icon { + font-size: 1.75rem; + line-height: 1; + flex-shrink: 0; +} + +.diagnosis-content h3 { + font-size: 1rem; + font-weight: 600; + margin-bottom: 0.25rem; +} + +.diagnosis-content p { + font-size: 0.9rem; + color: var(--text-muted); +} + +.info-grid { + display: grid; + grid-template-columns: repeat(auto-fill, minmax(140px, 1fr)); + gap: 0.75rem; + margin-bottom: 1.5rem; +} + +.info-item { + background: var(--bg); + border-radius: 6px; + padding: 0.75rem; +} + +.info-label { + display: block; + font-size: 0.75rem; + color: var(--text-muted); + margin-bottom: 0.2rem; +} + +.info-value { + font-size: 0.9rem; + font-weight: 500; +} + +.status-open { color: var(--success); } +.status-closed { color: var(--danger); } +.status-filtered { color: var(--warning); } +.status-unknown { color: var(--text-muted); } + +.details-section h3 { + font-size: 0.9rem; + color: var(--text-muted); + margin-bottom: 0.75rem; + font-weight: 500; +} + +.details-list { + list-style: none; + display: flex; + flex-direction: column; + gap: 0.5rem; +} + +.details-list li { + font-size: 0.85rem; + color: var(--text-muted); + padding-left: 1rem; + position: relative; +} + +.details-list li::before { + content: "›"; + position: absolute; + left: 0; + color: var(--primary); +} + +footer { + margin-top: auto; + padding-top: 2rem; + text-align: center; + color: var(--text-muted); + font-size: 0.8rem; +} + +@media (max-width: 600px) { + .form-row { + flex-direction: column; + } + .form-group.flex-2 { + min-width: 100%; + } +} diff --git a/static/index.html b/static/index.html new file mode 100644 index 0000000..5f68964 --- /dev/null +++ b/static/index.html @@ -0,0 +1,94 @@ + + + + + + CPCHECK - 云服务器端口检测 + + + +
+
+

CPCHECK

+

云服务器端口检测工具 · 快速定位端口问题

+
+ +
+
+
+
+ + +
+
+ + +
+
+ + +
+
+ +
+ + + + +
+ +
+

CPCHECK v1.0 · 支持 TCP/UDP 端口检测

+
+
+ + + + diff --git a/static/js/app.js b/static/js/app.js new file mode 100644 index 0000000..e14e46a --- /dev/null +++ b/static/js/app.js @@ -0,0 +1,103 @@ +const DIAGNOSIS_CONFIG = { + port_open: { title: "端口开放", icon: "✓", class: "success" }, + port_not_open: { title: "端口未开放", icon: "✗", class: "danger" }, + likely_firewall: { title: "可能被防火墙拦截", icon: "⚠", class: "warning" }, + likely_blocked: { title: "可能被墙或过滤", icon: "⊘", class: "danger" }, + host_unreachable: { title: "主机不可达", icon: "✗", class: "danger" }, + dns_failed: { title: "DNS 解析失败", icon: "✗", class: "danger" }, + udp_ambiguous: { title: "UDP 状态不确定", icon: "?", class: "info" }, +}; + +const STATUS_LABELS = { + open: { text: "开放", class: "status-open" }, + closed: { text: "关闭", class: "status-closed" }, + filtered: { text: "被过滤", class: "status-filtered" }, + unknown: { text: "不确定", class: "status-unknown" }, +}; + +const form = document.getElementById("checkForm"); +const submitBtn = document.getElementById("submitBtn"); +const btnText = submitBtn.querySelector(".btn-text"); +const btnLoading = submitBtn.querySelector(".btn-loading"); +const errorBox = document.getElementById("errorBox"); +const resultBox = document.getElementById("resultBox"); + +form.addEventListener("submit", async (e) => { + e.preventDefault(); + hideError(); + resultBox.hidden = true; + setLoading(true); + + const host = document.getElementById("host").value.trim(); + const port = parseInt(document.getElementById("port").value, 10); + const protocol = document.getElementById("protocol").value; + + try { + const resp = await fetch("/api/check", { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ host, port, protocol }), + }); + + const json = await resp.json(); + + if (!resp.ok) { + throw new Error(json.detail || json.error || "请求失败"); + } + + if (!json.success || !json.data) { + throw new Error(json.error || "检测失败"); + } + + renderResult(json.data); + } catch (err) { + showError(err.message || "网络错误,请稍后重试"); + } finally { + setLoading(false); + } +}); + +function setLoading(loading) { + submitBtn.disabled = loading; + btnText.hidden = loading; + btnLoading.hidden = !loading; +} + +function showError(msg) { + errorBox.textContent = msg; + errorBox.hidden = false; +} + +function hideError() { + errorBox.hidden = true; +} + +function renderResult(data) { + const diag = DIAGNOSIS_CONFIG[data.diagnosis] || DIAGNOSIS_CONFIG.udp_ambiguous; + const status = STATUS_LABELS[data.port_status] || STATUS_LABELS.unknown; + + const card = document.getElementById("diagnosisCard"); + card.className = `diagnosis-card ${diag.class}`; + document.getElementById("diagnosisIcon").textContent = diag.icon; + document.getElementById("diagnosisTitle").textContent = diag.title; + document.getElementById("diagnosisMessage").textContent = data.diagnosis_message; + + document.getElementById("elapsed").textContent = `耗时 ${data.elapsed_ms}ms`; + document.getElementById("infoTarget").textContent = `${data.host}:${data.port} (${data.protocol.toUpperCase()})`; + document.getElementById("infoIp").textContent = data.resolved_ip || "-"; + document.getElementById("infoStatus").innerHTML = + `${status.text}`; + document.getElementById("infoReachable").textContent = data.host_reachable ? "是" : "否"; + document.getElementById("infoLatency").textContent = + data.latency_ms != null ? `${data.latency_ms} ms` : "-"; + + const list = document.getElementById("detailsList"); + list.innerHTML = ""; + (data.details || []).forEach((d) => { + const li = document.createElement("li"); + li.textContent = d; + list.appendChild(li); + }); + + resultBox.hidden = false; +} diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..4540de9 --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1 @@ +"""测试包""" diff --git a/tests/test_detector.py b/tests/test_detector.py new file mode 100644 index 0000000..fc55948 --- /dev/null +++ b/tests/test_detector.py @@ -0,0 +1,119 @@ +"""CPCHECK 单元测试""" + +import socket +import unittest +from unittest.mock import MagicMock, patch + +from app.detector import ( + CheckResult, + DiagnosisType, + PortStatus, + check_tcp, + diagnose, + resolve_host, + run_check, +) + + +class TestResolveHost(unittest.TestCase): + def test_resolve_localhost(self): + ip, err = resolve_host("127.0.0.1") + self.assertIsNone(err) + self.assertEqual(ip, "127.0.0.1") + + def test_resolve_invalid(self): + ip, err = resolve_host("this-domain-does-not-exist-xyz123.invalid") + self.assertIsNone(ip) + self.assertIsNotNone(err) + + +class TestCheckTcp(unittest.TestCase): + def test_tcp_open(self): + with patch("socket.socket") as mock_sock_cls: + mock_sock = MagicMock() + mock_sock_cls.return_value = mock_sock + status, latency, detail = check_tcp("127.0.0.1", 80, timeout=1) + self.assertEqual(status, PortStatus.OPEN) + self.assertIsNotNone(latency) + + def test_tcp_closed(self): + with patch("socket.socket") as mock_sock_cls: + mock_sock = MagicMock() + mock_sock.connect.side_effect = ConnectionRefusedError + mock_sock_cls.return_value = mock_sock + status, latency, detail = check_tcp("127.0.0.1", 59999, timeout=1) + self.assertEqual(status, PortStatus.CLOSED) + self.assertIsNone(latency) + + def test_tcp_timeout(self): + with patch("socket.socket") as mock_sock_cls: + mock_sock = MagicMock() + mock_sock.connect.side_effect = socket.timeout + mock_sock_cls.return_value = mock_sock + status, latency, detail = check_tcp("127.0.0.1", 59998, timeout=1) + self.assertEqual(status, PortStatus.FILTERED) + + +class TestDiagnose(unittest.TestCase): + def test_port_open(self): + d, msg = diagnose(PortStatus.OPEN, "tcp", True, True) + self.assertEqual(d, DiagnosisType.PORT_OPEN) + + def test_port_closed(self): + d, msg = diagnose(PortStatus.CLOSED, "tcp", True, True) + self.assertEqual(d, DiagnosisType.PORT_NOT_OPEN) + + def test_filtered_reachable(self): + d, msg = diagnose(PortStatus.FILTERED, "tcp", True, True) + self.assertEqual(d, DiagnosisType.LIKELY_FIREWALL) + + def test_filtered_unreachable(self): + d, msg = diagnose(PortStatus.FILTERED, "tcp", False, True) + self.assertEqual(d, DiagnosisType.LIKELY_BLOCKED) + + def test_dns_failed(self): + d, msg = diagnose(PortStatus.UNKNOWN, "tcp", False, False) + self.assertEqual(d, DiagnosisType.DNS_FAILED) + + +class TestRunCheck(unittest.TestCase): + @patch("app.detector.ping_host", return_value=True) + @patch("app.detector.check_tcp") + @patch("app.detector.resolve_host", return_value=("93.184.216.34", None)) + def test_run_check_success(self, mock_resolve, mock_tcp, mock_ping): + mock_tcp.return_value = (PortStatus.OPEN, 50.0, "TCP 连接成功") + result = run_check("example.com", 80, "tcp") + self.assertIsInstance(result, CheckResult) + self.assertTrue(result.dns_ok) + self.assertEqual(result.diagnosis, DiagnosisType.PORT_OPEN) + + @patch("app.detector.resolve_host", return_value=(None, "Name resolution failed")) + def test_run_check_dns_fail(self, mock_resolve): + result = run_check("invalid.invalid", 80, "tcp") + self.assertFalse(result.dns_ok) + self.assertEqual(result.diagnosis, DiagnosisType.DNS_FAILED) + + +class TestAPIValidation(unittest.TestCase): + def test_check_request_validation(self): + try: + from pydantic import ValidationError + from app.main import CheckRequest + except ImportError: + self.skipTest("FastAPI/Pydantic not installed") + + req = CheckRequest(host="example.com", port=443, protocol="tcp") + self.assertEqual(req.protocol, "tcp") + + with self.assertRaises(ValidationError): + CheckRequest(host="example.com", port=443, protocol="icmp") + + with self.assertRaises(ValidationError): + CheckRequest(host="example.com", port=0, protocol="tcp") + + with self.assertRaises(ValidationError): + CheckRequest(host="", port=80, protocol="tcp") + + +if __name__ == "__main__": + unittest.main()