""" 整机许可客户端:设备 ID、激活码兑换、定期云端校验。 签发与管理 Web 为独立云端服务,不在本仓库内。 """ from __future__ import annotations import hashlib import json import os import platform import subprocess from datetime import datetime, timedelta, timezone from functools import wraps from typing import Any, Callable, Optional import requests _REPO_ROOT = os.path.dirname(os.path.abspath(__file__)) LICENSE_DIR = os.path.join(_REPO_ROOT, ".license") DEVICE_ID_FILE = os.path.join(LICENSE_DIR, "device.id") CACHE_FILE = os.path.join(LICENSE_DIR, "license.cache") _TEMPLATE_DIR = os.path.join(_REPO_ROOT, "license_templates") BJ_TZ = timezone(timedelta(hours=8)) def load_shared_env() -> None: """从仓库根目录 .env 加载 LICENSE_*(子项目 .env 已存在同名变量时不覆盖)。""" path = os.path.join(_REPO_ROOT, ".env") if not os.path.isfile(path): return try: raw_bytes = open(path, "rb").read() except OSError: return text = "" for enc in ("utf-8-sig", "utf-16", "utf-16-le", "utf-16-be"): try: text = raw_bytes.decode(enc) break except Exception: continue if not text: text = raw_bytes.decode("utf-8", errors="ignore") text = text.replace("\x00", "") for line in text.splitlines(): raw = line.strip() if not raw or raw.startswith("#") or "=" not in raw: continue key, value = raw.split("=", 1) clean_key = key.strip().lstrip("\ufeff") if not clean_key.startswith("LICENSE_"): continue if clean_key in os.environ and (os.environ.get(clean_key) or "").strip(): continue clean_value = value.strip().strip('"').strip("'") os.environ[clean_key] = clean_value def _env_bool(name: str, default: bool = False) -> bool: return (os.getenv(name) or "").strip().lower() in ("1", "true", "yes", "on") def license_disabled() -> bool: return _env_bool("LICENSE_DISABLED", False) def license_api_url() -> str: return (os.getenv("LICENSE_API_URL") or "").strip().rstrip("/") def license_client_key() -> str: return (os.getenv("LICENSE_CLIENT_KEY") or "").strip() def check_interval_days() -> int: try: return max(1, int(os.getenv("LICENSE_CHECK_INTERVAL_DAYS", "3"))) except ValueError: return 3 def offline_grace_days() -> int: try: return max(1, int(os.getenv("LICENSE_OFFLINE_GRACE_DAYS", "7"))) except ValueError: return 7 def wechat_id() -> str: return (os.getenv("LICENSE_WECHAT_ID") or "dekun03").strip() def wechat_remark_hint(device_id: str) -> str: custom = (os.getenv("LICENSE_WECHAT_REMARK") or "").strip() if custom: return custom return device_id def _machine_fingerprint() -> str: parts = [ platform.node() or "", platform.system() or "", platform.release() or "", platform.machine() or "", ] if platform.system() == "Windows": try: out = subprocess.check_output( ["wmic", "csproduct", "get", "uuid"], stderr=subprocess.DEVNULL, timeout=8, text=True, encoding="utf-8", errors="ignore", ) for line in out.splitlines(): line = line.strip() if line and line.lower() != "uuid": parts.append(line) break except Exception: pass else: for path in ("/etc/machine-id", "/var/lib/dbus/machine-id"): try: if os.path.isfile(path): parts.append(open(path, encoding="utf-8", errors="ignore").read().strip()) break except Exception: pass return "|".join(parts) def get_device_id() -> str: os.makedirs(LICENSE_DIR, exist_ok=True) if os.path.isfile(DEVICE_ID_FILE): with open(DEVICE_ID_FILE, encoding="utf-8") as f: existing = f.read().strip() if existing: return existing raw = _machine_fingerprint() device_id = hashlib.sha256(raw.encode("utf-8")).hexdigest()[:32] with open(DEVICE_ID_FILE, "w", encoding="utf-8") as f: f.write(device_id) return device_id def _parse_iso(value: str | None) -> Optional[datetime]: if not value: return None s = str(value).strip() if not s: return None if s.endswith("Z"): s = s[:-1] + "+00:00" try: dt = datetime.fromisoformat(s) except ValueError: return None if dt.tzinfo is None: dt = dt.replace(tzinfo=BJ_TZ) return dt.astimezone(BJ_TZ) def _now_bj() -> datetime: return datetime.now(BJ_TZ) def _load_cache() -> Optional[dict[str, Any]]: if not os.path.isfile(CACHE_FILE): return None try: with open(CACHE_FILE, encoding="utf-8") as f: data = json.load(f) return data if isinstance(data, dict) else None except Exception: return None def _save_cache(data: dict[str, Any]) -> None: os.makedirs(LICENSE_DIR, exist_ok=True) with open(CACHE_FILE, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def _api_headers() -> dict[str, str]: headers = {"Content-Type": "application/json"} key = license_client_key() if key: headers["X-Client-Key"] = key return headers def _api_configured() -> bool: return bool(license_api_url() and license_client_key()) def redeem_code(code: str) -> tuple[bool, str]: if license_disabled(): return True, "许可校验已关闭(开发模式)" if not _api_configured(): return False, "未配置 LICENSE_API_URL / LICENSE_CLIENT_KEY" device_id = get_device_id() url = f"{license_api_url()}/v1/redeem" try: r = requests.post( url, json={"device_id": device_id, "code": (code or "").strip()}, headers=_api_headers(), timeout=30, ) body = r.json() if r.content else {} except requests.RequestException as e: return False, f"无法连接授权服务:{e}" except ValueError: return False, "授权服务返回非 JSON" if r.status_code >= 400 or not body.get("ok"): return False, (body.get("message") or body.get("detail") or f"兑换失败 HTTP {r.status_code}") expires_at = body.get("expires_at") or "" cache = { "device_id": device_id, "subscription_id": body.get("subscription_id") or "", "plan": body.get("plan") or "", "expires_at": expires_at, "last_validated_at": _now_bj().isoformat(), "last_validated_ok_at": _now_bj().isoformat(), } _save_cache(cache) return True, "激活成功" def validate_remote() -> tuple[bool, str, Optional[dict[str, Any]]]: if license_disabled(): return True, "", _load_cache() if not _api_configured(): return False, "未配置授权服务", None cache = _load_cache() if not cache or not cache.get("subscription_id"): return False, "未激活", None device_id = get_device_id() url = f"{license_api_url()}/v1/validate" try: r = requests.post( url, json={ "device_id": device_id, "subscription_id": cache.get("subscription_id"), }, headers=_api_headers(), timeout=20, ) body = r.json() if r.content else {} except requests.RequestException as e: return False, f"校验请求失败:{e}", cache except ValueError: return False, "授权服务返回非 JSON", cache now_iso = _now_bj().isoformat() cache["last_validated_at"] = now_iso if r.status_code >= 400 or not body.get("ok"): _save_cache(cache) return False, body.get("message") or body.get("detail") or "校验失败", cache if not body.get("valid"): reason = body.get("reason") or body.get("message") or "许可无效" _save_cache(cache) return False, reason, cache if body.get("expires_at"): cache["expires_at"] = body["expires_at"] if body.get("plan"): cache["plan"] = body["plan"] cache["last_validated_ok_at"] = now_iso _save_cache(cache) return True, "", cache def _needs_periodic_check(last_validated_at: str | None) -> bool: if not last_validated_at: return True last = _parse_iso(last_validated_at) if not last: return True return _now_bj() - last >= timedelta(days=check_interval_days()) def _within_offline_grace(cache: dict[str, Any]) -> bool: ref = cache.get("last_validated_ok_at") or cache.get("last_validated_at") last_ok = _parse_iso(ref if isinstance(ref, str) else None) if not last_ok: return False return _now_bj() - last_ok <= timedelta(days=offline_grace_days()) def _is_expired(cache: dict[str, Any]) -> bool: exp = _parse_iso(cache.get("expires_at")) if not exp: return True return _now_bj() > exp def is_licensed(*, force_validate: bool = False) -> bool: if license_disabled(): return True cache = _load_cache() if not cache: return False if cache.get("device_id") and cache.get("device_id") != get_device_id(): return False if _is_expired(cache): return False if force_validate or _needs_periodic_check(cache.get("last_validated_at")): ok, _msg, updated = validate_remote() if ok: return True use_cache = updated or cache if _within_offline_grace(use_cache) and not _is_expired(use_cache): return True return False return True def license_status() -> dict[str, Any]: device_id = get_device_id() cache = _load_cache() or {} exp = _parse_iso(cache.get("expires_at")) licensed = is_licensed() days_left = None if exp: days_left = max(0, (exp - _now_bj()).days) return { "device_id": device_id, "licensed": licensed, "disabled": license_disabled(), "api_configured": _api_configured(), "subscription_id": cache.get("subscription_id") or "", "plan": cache.get("plan") or "", "expires_at": cache.get("expires_at") or "", "expires_at_display": exp.strftime("%Y-%m-%d %H:%M") if exp else "", "days_left": days_left, "last_validated_at": cache.get("last_validated_at") or "", "check_interval_days": check_interval_days(), "offline_grace_days": offline_grace_days(), "wechat_id": wechat_id(), "wechat_remark": wechat_remark_hint(device_id), "plans": [ {"id": "monthly", "name": "月卡", "price": 199, "days": 30}, {"id": "quarterly", "name": "季卡", "price": 399, "days": 90}, {"id": "yearly", "name": "年卡", "price": 699, "days": 365}, ], } def _render_license_html(ctx: dict[str, Any]) -> str: path = os.path.join(_TEMPLATE_DIR, "license.html") with open(path, encoding="utf-8") as f: template = f.read() from jinja2 import Template return Template(template).render(**ctx) def init_flask_app(app, exchange_display: str = "交易系统") -> None: """注册 /license 路由与全局门禁。""" load_shared_env() @app.route("/license", methods=["GET", "POST"]) def license_page(): from flask import flash, get_flashed_messages, redirect, render_template_string, request, url_for if request.method == "POST": code = (request.form.get("activation_code") or "").strip() if not code: flash("请输入激活码") else: ok, msg = redeem_code(code) if ok: flash(msg or "激活成功", "ok") return redirect(url_for("license_page")) flash(msg or "激活失败") return redirect(url_for("license_page")) status = license_status() html = _render_license_html( { **status, "exchange_display": exchange_display, "messages": get_flashed_messages(with_categories=True), } ) return render_template_string(html) @app.before_request def _license_gate(): from flask import redirect, request, url_for if license_disabled(): return None endpoint = request.endpoint or "" if endpoint in ("license_page", "login", "logout") or endpoint.startswith("static"): return None if not is_licensed(): return redirect(url_for("license_page")) return None def init_fastapi_app(app) -> None: from fastapi import Request from fastapi.responses import HTMLResponse, RedirectResponse load_shared_env() @app.get("/license", response_class=HTMLResponse) async def license_page(request: Request): return HTMLResponse(_render_license_html({**license_status(), "exchange_display": "中控", "messages": []})) @app.middleware("http") async def _license_gate(request: Request, call_next): if license_disabled(): return await call_next(request) path = request.url.path if path == "/license" or path.startswith("/assets"): return await call_next(request) if not is_licensed(): return RedirectResponse(url="/license", status_code=302) return await call_next(request)