Files
2026-05-21 17:51:21 +08:00

452 lines
14 KiB
Python

"""
整机许可客户端:设备 ID、激活码兑换、定期云端校验。
签发与管理 Web 为独立云端服务,不在本仓库内。
"""
from __future__ import annotations
import hashlib
import json
import os
import platform
import subprocess
from datetime import datetime, timedelta, timezone
from functools import wraps
from typing import Any, Callable, Optional
import requests
_REPO_ROOT = os.path.dirname(os.path.abspath(__file__))
LICENSE_DIR = os.path.join(_REPO_ROOT, ".license")
DEVICE_ID_FILE = os.path.join(LICENSE_DIR, "device.id")
CACHE_FILE = os.path.join(LICENSE_DIR, "license.cache")
_TEMPLATE_DIR = os.path.join(_REPO_ROOT, "license_templates")
BJ_TZ = timezone(timedelta(hours=8))
def _apply_env_lines(text: str, *, license_only: bool, override: bool) -> None:
text = text.replace("\x00", "")
for line in text.splitlines():
raw = line.strip()
if not raw or raw.startswith("#") or "=" not in raw:
continue
key, value = raw.split("=", 1)
clean_key = key.strip().lstrip("\ufeff")
if license_only and not clean_key.startswith("LICENSE_"):
continue
clean_value = value.strip().strip('"').strip("'")
if not clean_value:
continue
if not override and clean_key in os.environ and (os.environ.get(clean_key) or "").strip():
continue
os.environ[clean_key] = clean_value
def _read_env_file(path: str) -> str:
if not os.path.isfile(path):
return ""
try:
raw_bytes = open(path, "rb").read()
except OSError:
return ""
for enc in ("utf-8-sig", "utf-16", "utf-16-le", "utf-16-be"):
try:
return raw_bytes.decode(enc)
except Exception:
continue
return raw_bytes.decode("utf-8", errors="ignore")
def load_shared_env() -> None:
"""
加载顺序(后加载的可补空位):
1. 仓库根 license.env — 默认授权地址 https://sq.bz121.com
2. 仓库根 .env 中的 LICENSE_* — 可覆盖密钥等(勿提交 Git)
子目录 .env 若已先加载且已有 LICENSE_*,则不被覆盖。
"""
license_path = os.path.join(_REPO_ROOT, "license.env")
_apply_env_lines(_read_env_file(license_path), license_only=True, override=True)
root_env = os.path.join(_REPO_ROOT, ".env")
_apply_env_lines(_read_env_file(root_env), license_only=True, override=True)
def _env_bool(name: str, default: bool = False) -> bool:
return (os.getenv(name) or "").strip().lower() in ("1", "true", "yes", "on")
def license_disabled() -> bool:
return _env_bool("LICENSE_DISABLED", False)
def license_api_url() -> str:
return (os.getenv("LICENSE_API_URL") or "").strip().rstrip("/")
def license_client_key() -> str:
return (os.getenv("LICENSE_CLIENT_KEY") or "").strip()
def check_interval_days() -> int:
try:
return max(1, int(os.getenv("LICENSE_CHECK_INTERVAL_DAYS", "3")))
except ValueError:
return 3
def offline_grace_days() -> int:
try:
return max(1, int(os.getenv("LICENSE_OFFLINE_GRACE_DAYS", "7")))
except ValueError:
return 7
def wechat_id() -> str:
return (os.getenv("LICENSE_WECHAT_ID") or "dekun03").strip()
def wechat_remark_hint(device_id: str) -> str:
custom = (os.getenv("LICENSE_WECHAT_REMARK") or "").strip()
if custom:
return custom
return device_id
def _machine_fingerprint() -> str:
parts = [
platform.node() or "",
platform.system() or "",
platform.release() or "",
platform.machine() or "",
]
if platform.system() == "Windows":
try:
out = subprocess.check_output(
["wmic", "csproduct", "get", "uuid"],
stderr=subprocess.DEVNULL,
timeout=8,
text=True,
encoding="utf-8",
errors="ignore",
)
for line in out.splitlines():
line = line.strip()
if line and line.lower() != "uuid":
parts.append(line)
break
except Exception:
pass
else:
for path in ("/etc/machine-id", "/var/lib/dbus/machine-id"):
try:
if os.path.isfile(path):
parts.append(open(path, encoding="utf-8", errors="ignore").read().strip())
break
except Exception:
pass
return "|".join(parts)
def get_device_id() -> str:
os.makedirs(LICENSE_DIR, exist_ok=True)
if os.path.isfile(DEVICE_ID_FILE):
with open(DEVICE_ID_FILE, encoding="utf-8") as f:
existing = f.read().strip()
if existing:
return existing
raw = _machine_fingerprint()
device_id = hashlib.sha256(raw.encode("utf-8")).hexdigest()[:32]
with open(DEVICE_ID_FILE, "w", encoding="utf-8") as f:
f.write(device_id)
return device_id
def _parse_iso(value: str | None) -> Optional[datetime]:
if not value:
return None
s = str(value).strip()
if not s:
return None
if s.endswith("Z"):
s = s[:-1] + "+00:00"
try:
dt = datetime.fromisoformat(s)
except ValueError:
return None
if dt.tzinfo is None:
dt = dt.replace(tzinfo=BJ_TZ)
return dt.astimezone(BJ_TZ)
def _now_bj() -> datetime:
return datetime.now(BJ_TZ)
def _load_cache() -> Optional[dict[str, Any]]:
if not os.path.isfile(CACHE_FILE):
return None
try:
with open(CACHE_FILE, encoding="utf-8") as f:
data = json.load(f)
return data if isinstance(data, dict) else None
except Exception:
return None
def _save_cache(data: dict[str, Any]) -> None:
os.makedirs(LICENSE_DIR, exist_ok=True)
with open(CACHE_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def _api_headers() -> dict[str, str]:
headers = {"Content-Type": "application/json"}
key = license_client_key()
if key:
headers["X-Client-Key"] = key
return headers
def _api_configured() -> bool:
return bool(license_api_url() and license_client_key())
def redeem_code(code: str) -> tuple[bool, str]:
if license_disabled():
return True, "许可校验已关闭(开发模式)"
if not _api_configured():
return False, "未配置 LICENSE_API_URL / LICENSE_CLIENT_KEY"
device_id = get_device_id()
url = f"{license_api_url()}/v1/redeem"
try:
r = requests.post(
url,
json={"device_id": device_id, "code": (code or "").strip()},
headers=_api_headers(),
timeout=30,
)
body = r.json() if r.content else {}
except requests.RequestException as e:
return False, f"无法连接授权服务:{e}"
except ValueError:
return False, "授权服务返回非 JSON"
if r.status_code >= 400 or not body.get("ok"):
return False, (body.get("message") or body.get("detail") or f"兑换失败 HTTP {r.status_code}")
expires_at = body.get("expires_at") or ""
cache = {
"device_id": device_id,
"subscription_id": body.get("subscription_id") or "",
"plan": body.get("plan") or "",
"expires_at": expires_at,
"last_validated_at": _now_bj().isoformat(),
"last_validated_ok_at": _now_bj().isoformat(),
}
_save_cache(cache)
return True, "激活成功"
def validate_remote() -> tuple[bool, str, Optional[dict[str, Any]]]:
if license_disabled():
return True, "", _load_cache()
if not _api_configured():
return False, "未配置授权服务", None
cache = _load_cache()
if not cache or not cache.get("subscription_id"):
return False, "未激活", None
device_id = get_device_id()
url = f"{license_api_url()}/v1/validate"
try:
r = requests.post(
url,
json={
"device_id": device_id,
"subscription_id": cache.get("subscription_id"),
},
headers=_api_headers(),
timeout=20,
)
body = r.json() if r.content else {}
except requests.RequestException as e:
return False, f"校验请求失败:{e}", cache
except ValueError:
return False, "授权服务返回非 JSON", cache
now_iso = _now_bj().isoformat()
cache["last_validated_at"] = now_iso
if r.status_code >= 400 or not body.get("ok"):
_save_cache(cache)
return False, body.get("message") or body.get("detail") or "校验失败", cache
if not body.get("valid"):
reason = body.get("reason") or body.get("message") or "许可无效"
_save_cache(cache)
return False, reason, cache
if body.get("expires_at"):
cache["expires_at"] = body["expires_at"]
if body.get("plan"):
cache["plan"] = body["plan"]
cache["last_validated_ok_at"] = now_iso
_save_cache(cache)
return True, "", cache
def _needs_periodic_check(last_validated_at: str | None) -> bool:
if not last_validated_at:
return True
last = _parse_iso(last_validated_at)
if not last:
return True
return _now_bj() - last >= timedelta(days=check_interval_days())
def _within_offline_grace(cache: dict[str, Any]) -> bool:
ref = cache.get("last_validated_ok_at") or cache.get("last_validated_at")
last_ok = _parse_iso(ref if isinstance(ref, str) else None)
if not last_ok:
return False
return _now_bj() - last_ok <= timedelta(days=offline_grace_days())
def _is_expired(cache: dict[str, Any]) -> bool:
exp = _parse_iso(cache.get("expires_at"))
if not exp:
return True
return _now_bj() > exp
def is_licensed(*, force_validate: bool = False) -> bool:
if license_disabled():
return True
cache = _load_cache()
if not cache:
return False
if cache.get("device_id") and cache.get("device_id") != get_device_id():
return False
if _is_expired(cache):
return False
if force_validate or _needs_periodic_check(cache.get("last_validated_at")):
ok, _msg, updated = validate_remote()
if ok:
return True
use_cache = updated or cache
if _within_offline_grace(use_cache) and not _is_expired(use_cache):
return True
return False
return True
def license_status() -> dict[str, Any]:
device_id = get_device_id()
cache = _load_cache() or {}
exp = _parse_iso(cache.get("expires_at"))
licensed = is_licensed()
days_left = None
if exp:
days_left = max(0, (exp - _now_bj()).days)
return {
"device_id": device_id,
"licensed": licensed,
"disabled": license_disabled(),
"api_configured": _api_configured(),
"subscription_id": cache.get("subscription_id") or "",
"plan": cache.get("plan") or "",
"expires_at": cache.get("expires_at") or "",
"expires_at_display": exp.strftime("%Y-%m-%d %H:%M") if exp else "",
"days_left": days_left,
"last_validated_at": cache.get("last_validated_at") or "",
"check_interval_days": check_interval_days(),
"offline_grace_days": offline_grace_days(),
"wechat_id": wechat_id(),
"wechat_remark": wechat_remark_hint(device_id),
"plans": [
{"id": "monthly", "name": "月卡", "price": 199, "days": 30},
{"id": "quarterly", "name": "季卡", "price": 399, "days": 90},
{"id": "yearly", "name": "年卡", "price": 699, "days": 365},
],
}
def _render_license_html(ctx: dict[str, Any]) -> str:
path = os.path.join(_TEMPLATE_DIR, "license.html")
with open(path, encoding="utf-8") as f:
template = f.read()
from jinja2 import Template
return Template(template).render(**ctx)
def init_flask_app(app, exchange_display: str = "交易系统") -> None:
"""注册 /license 路由与全局门禁。"""
load_shared_env()
@app.route("/license", methods=["GET", "POST"])
def license_page():
from flask import flash, get_flashed_messages, redirect, render_template_string, request, url_for
if request.method == "POST":
code = (request.form.get("activation_code") or "").strip()
if not code:
flash("请输入激活码")
else:
ok, msg = redeem_code(code)
if ok:
flash(msg or "激活成功", "ok")
return redirect(url_for("license_page"))
flash(msg or "激活失败")
return redirect(url_for("license_page"))
status = license_status()
html = _render_license_html(
{
**status,
"exchange_display": exchange_display,
"messages": get_flashed_messages(with_categories=True),
}
)
return render_template_string(html)
@app.before_request
def _license_gate():
from flask import redirect, request, url_for
if license_disabled():
return None
endpoint = request.endpoint or ""
if endpoint in ("license_page", "login", "logout") or endpoint.startswith("static"):
return None
if not is_licensed():
return redirect(url_for("license_page"))
return None
def init_fastapi_app(app) -> None:
from fastapi import Request
from fastapi.responses import HTMLResponse, RedirectResponse
load_shared_env()
@app.get("/license", response_class=HTMLResponse)
async def license_page(request: Request):
return HTMLResponse(_render_license_html({**license_status(), "exchange_display": "中控", "messages": []}))
@app.middleware("http")
async def _license_gate(request: Request, call_next):
if license_disabled():
return await call_next(request)
path = request.url.path
if path == "/license" or path.startswith("/assets"):
return await call_next(request)
if not is_licensed():
return RedirectResponse(url="/license", status_code=302)
return await call_next(request)