452 lines
14 KiB
Python
452 lines
14 KiB
Python
"""
|
|
整机许可客户端:设备 ID、激活码兑换、定期云端校验。
|
|
签发与管理 Web 为独立云端服务,不在本仓库内。
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import platform
|
|
import subprocess
|
|
from datetime import datetime, timedelta, timezone
|
|
from functools import wraps
|
|
from typing import Any, Callable, Optional
|
|
|
|
import requests
|
|
|
|
_REPO_ROOT = os.path.dirname(os.path.abspath(__file__))
|
|
LICENSE_DIR = os.path.join(_REPO_ROOT, ".license")
|
|
DEVICE_ID_FILE = os.path.join(LICENSE_DIR, "device.id")
|
|
CACHE_FILE = os.path.join(LICENSE_DIR, "license.cache")
|
|
_TEMPLATE_DIR = os.path.join(_REPO_ROOT, "license_templates")
|
|
|
|
BJ_TZ = timezone(timedelta(hours=8))
|
|
|
|
|
|
def _apply_env_lines(text: str, *, license_only: bool, override: bool) -> None:
|
|
text = text.replace("\x00", "")
|
|
for line in text.splitlines():
|
|
raw = line.strip()
|
|
if not raw or raw.startswith("#") or "=" not in raw:
|
|
continue
|
|
key, value = raw.split("=", 1)
|
|
clean_key = key.strip().lstrip("\ufeff")
|
|
if license_only and not clean_key.startswith("LICENSE_"):
|
|
continue
|
|
clean_value = value.strip().strip('"').strip("'")
|
|
if not clean_value:
|
|
continue
|
|
if not override and clean_key in os.environ and (os.environ.get(clean_key) or "").strip():
|
|
continue
|
|
os.environ[clean_key] = clean_value
|
|
|
|
|
|
def _read_env_file(path: str) -> str:
|
|
if not os.path.isfile(path):
|
|
return ""
|
|
try:
|
|
raw_bytes = open(path, "rb").read()
|
|
except OSError:
|
|
return ""
|
|
for enc in ("utf-8-sig", "utf-16", "utf-16-le", "utf-16-be"):
|
|
try:
|
|
return raw_bytes.decode(enc)
|
|
except Exception:
|
|
continue
|
|
return raw_bytes.decode("utf-8", errors="ignore")
|
|
|
|
|
|
def load_shared_env() -> None:
|
|
"""
|
|
加载顺序(后加载的可补空位):
|
|
1. 仓库根 license.env — 默认授权地址 https://sq.bz121.com
|
|
2. 仓库根 .env 中的 LICENSE_* — 可覆盖密钥等(勿提交 Git)
|
|
子目录 .env 若已先加载且已有 LICENSE_*,则不被覆盖。
|
|
"""
|
|
license_path = os.path.join(_REPO_ROOT, "license.env")
|
|
_apply_env_lines(_read_env_file(license_path), license_only=True, override=True)
|
|
|
|
root_env = os.path.join(_REPO_ROOT, ".env")
|
|
_apply_env_lines(_read_env_file(root_env), license_only=True, override=True)
|
|
|
|
|
|
def _env_bool(name: str, default: bool = False) -> bool:
|
|
return (os.getenv(name) or "").strip().lower() in ("1", "true", "yes", "on")
|
|
|
|
|
|
def license_disabled() -> bool:
|
|
return _env_bool("LICENSE_DISABLED", False)
|
|
|
|
|
|
def license_api_url() -> str:
|
|
return (os.getenv("LICENSE_API_URL") or "").strip().rstrip("/")
|
|
|
|
|
|
def license_client_key() -> str:
|
|
return (os.getenv("LICENSE_CLIENT_KEY") or "").strip()
|
|
|
|
|
|
def check_interval_days() -> int:
|
|
try:
|
|
return max(1, int(os.getenv("LICENSE_CHECK_INTERVAL_DAYS", "3")))
|
|
except ValueError:
|
|
return 3
|
|
|
|
|
|
def offline_grace_days() -> int:
|
|
try:
|
|
return max(1, int(os.getenv("LICENSE_OFFLINE_GRACE_DAYS", "7")))
|
|
except ValueError:
|
|
return 7
|
|
|
|
|
|
def wechat_id() -> str:
|
|
return (os.getenv("LICENSE_WECHAT_ID") or "dekun03").strip()
|
|
|
|
|
|
def wechat_remark_hint(device_id: str) -> str:
|
|
custom = (os.getenv("LICENSE_WECHAT_REMARK") or "").strip()
|
|
if custom:
|
|
return custom
|
|
return device_id
|
|
|
|
|
|
def _machine_fingerprint() -> str:
|
|
parts = [
|
|
platform.node() or "",
|
|
platform.system() or "",
|
|
platform.release() or "",
|
|
platform.machine() or "",
|
|
]
|
|
if platform.system() == "Windows":
|
|
try:
|
|
out = subprocess.check_output(
|
|
["wmic", "csproduct", "get", "uuid"],
|
|
stderr=subprocess.DEVNULL,
|
|
timeout=8,
|
|
text=True,
|
|
encoding="utf-8",
|
|
errors="ignore",
|
|
)
|
|
for line in out.splitlines():
|
|
line = line.strip()
|
|
if line and line.lower() != "uuid":
|
|
parts.append(line)
|
|
break
|
|
except Exception:
|
|
pass
|
|
else:
|
|
for path in ("/etc/machine-id", "/var/lib/dbus/machine-id"):
|
|
try:
|
|
if os.path.isfile(path):
|
|
parts.append(open(path, encoding="utf-8", errors="ignore").read().strip())
|
|
break
|
|
except Exception:
|
|
pass
|
|
return "|".join(parts)
|
|
|
|
|
|
def get_device_id() -> str:
|
|
os.makedirs(LICENSE_DIR, exist_ok=True)
|
|
if os.path.isfile(DEVICE_ID_FILE):
|
|
with open(DEVICE_ID_FILE, encoding="utf-8") as f:
|
|
existing = f.read().strip()
|
|
if existing:
|
|
return existing
|
|
raw = _machine_fingerprint()
|
|
device_id = hashlib.sha256(raw.encode("utf-8")).hexdigest()[:32]
|
|
with open(DEVICE_ID_FILE, "w", encoding="utf-8") as f:
|
|
f.write(device_id)
|
|
return device_id
|
|
|
|
|
|
def _parse_iso(value: str | None) -> Optional[datetime]:
|
|
if not value:
|
|
return None
|
|
s = str(value).strip()
|
|
if not s:
|
|
return None
|
|
if s.endswith("Z"):
|
|
s = s[:-1] + "+00:00"
|
|
try:
|
|
dt = datetime.fromisoformat(s)
|
|
except ValueError:
|
|
return None
|
|
if dt.tzinfo is None:
|
|
dt = dt.replace(tzinfo=BJ_TZ)
|
|
return dt.astimezone(BJ_TZ)
|
|
|
|
|
|
def _now_bj() -> datetime:
|
|
return datetime.now(BJ_TZ)
|
|
|
|
|
|
def _load_cache() -> Optional[dict[str, Any]]:
|
|
if not os.path.isfile(CACHE_FILE):
|
|
return None
|
|
try:
|
|
with open(CACHE_FILE, encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
return data if isinstance(data, dict) else None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _save_cache(data: dict[str, Any]) -> None:
|
|
os.makedirs(LICENSE_DIR, exist_ok=True)
|
|
with open(CACHE_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
def _api_headers() -> dict[str, str]:
|
|
headers = {"Content-Type": "application/json"}
|
|
key = license_client_key()
|
|
if key:
|
|
headers["X-Client-Key"] = key
|
|
return headers
|
|
|
|
|
|
def _api_configured() -> bool:
|
|
return bool(license_api_url() and license_client_key())
|
|
|
|
|
|
def redeem_code(code: str) -> tuple[bool, str]:
|
|
if license_disabled():
|
|
return True, "许可校验已关闭(开发模式)"
|
|
if not _api_configured():
|
|
return False, "未配置 LICENSE_API_URL / LICENSE_CLIENT_KEY"
|
|
device_id = get_device_id()
|
|
url = f"{license_api_url()}/v1/redeem"
|
|
try:
|
|
r = requests.post(
|
|
url,
|
|
json={"device_id": device_id, "code": (code or "").strip()},
|
|
headers=_api_headers(),
|
|
timeout=30,
|
|
)
|
|
body = r.json() if r.content else {}
|
|
except requests.RequestException as e:
|
|
return False, f"无法连接授权服务:{e}"
|
|
except ValueError:
|
|
return False, "授权服务返回非 JSON"
|
|
|
|
if r.status_code >= 400 or not body.get("ok"):
|
|
return False, (body.get("message") or body.get("detail") or f"兑换失败 HTTP {r.status_code}")
|
|
|
|
expires_at = body.get("expires_at") or ""
|
|
cache = {
|
|
"device_id": device_id,
|
|
"subscription_id": body.get("subscription_id") or "",
|
|
"plan": body.get("plan") or "",
|
|
"expires_at": expires_at,
|
|
"last_validated_at": _now_bj().isoformat(),
|
|
"last_validated_ok_at": _now_bj().isoformat(),
|
|
}
|
|
_save_cache(cache)
|
|
return True, "激活成功"
|
|
|
|
|
|
def validate_remote() -> tuple[bool, str, Optional[dict[str, Any]]]:
|
|
if license_disabled():
|
|
return True, "", _load_cache()
|
|
if not _api_configured():
|
|
return False, "未配置授权服务", None
|
|
|
|
cache = _load_cache()
|
|
if not cache or not cache.get("subscription_id"):
|
|
return False, "未激活", None
|
|
|
|
device_id = get_device_id()
|
|
url = f"{license_api_url()}/v1/validate"
|
|
try:
|
|
r = requests.post(
|
|
url,
|
|
json={
|
|
"device_id": device_id,
|
|
"subscription_id": cache.get("subscription_id"),
|
|
},
|
|
headers=_api_headers(),
|
|
timeout=20,
|
|
)
|
|
body = r.json() if r.content else {}
|
|
except requests.RequestException as e:
|
|
return False, f"校验请求失败:{e}", cache
|
|
except ValueError:
|
|
return False, "授权服务返回非 JSON", cache
|
|
|
|
now_iso = _now_bj().isoformat()
|
|
cache["last_validated_at"] = now_iso
|
|
|
|
if r.status_code >= 400 or not body.get("ok"):
|
|
_save_cache(cache)
|
|
return False, body.get("message") or body.get("detail") or "校验失败", cache
|
|
|
|
if not body.get("valid"):
|
|
reason = body.get("reason") or body.get("message") or "许可无效"
|
|
_save_cache(cache)
|
|
return False, reason, cache
|
|
|
|
if body.get("expires_at"):
|
|
cache["expires_at"] = body["expires_at"]
|
|
if body.get("plan"):
|
|
cache["plan"] = body["plan"]
|
|
cache["last_validated_ok_at"] = now_iso
|
|
_save_cache(cache)
|
|
return True, "", cache
|
|
|
|
|
|
def _needs_periodic_check(last_validated_at: str | None) -> bool:
|
|
if not last_validated_at:
|
|
return True
|
|
last = _parse_iso(last_validated_at)
|
|
if not last:
|
|
return True
|
|
return _now_bj() - last >= timedelta(days=check_interval_days())
|
|
|
|
|
|
def _within_offline_grace(cache: dict[str, Any]) -> bool:
|
|
ref = cache.get("last_validated_ok_at") or cache.get("last_validated_at")
|
|
last_ok = _parse_iso(ref if isinstance(ref, str) else None)
|
|
if not last_ok:
|
|
return False
|
|
return _now_bj() - last_ok <= timedelta(days=offline_grace_days())
|
|
|
|
|
|
def _is_expired(cache: dict[str, Any]) -> bool:
|
|
exp = _parse_iso(cache.get("expires_at"))
|
|
if not exp:
|
|
return True
|
|
return _now_bj() > exp
|
|
|
|
|
|
def is_licensed(*, force_validate: bool = False) -> bool:
|
|
if license_disabled():
|
|
return True
|
|
|
|
cache = _load_cache()
|
|
if not cache:
|
|
return False
|
|
if cache.get("device_id") and cache.get("device_id") != get_device_id():
|
|
return False
|
|
if _is_expired(cache):
|
|
return False
|
|
|
|
if force_validate or _needs_periodic_check(cache.get("last_validated_at")):
|
|
ok, _msg, updated = validate_remote()
|
|
if ok:
|
|
return True
|
|
use_cache = updated or cache
|
|
if _within_offline_grace(use_cache) and not _is_expired(use_cache):
|
|
return True
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def license_status() -> dict[str, Any]:
|
|
device_id = get_device_id()
|
|
cache = _load_cache() or {}
|
|
exp = _parse_iso(cache.get("expires_at"))
|
|
licensed = is_licensed()
|
|
days_left = None
|
|
if exp:
|
|
days_left = max(0, (exp - _now_bj()).days)
|
|
|
|
return {
|
|
"device_id": device_id,
|
|
"licensed": licensed,
|
|
"disabled": license_disabled(),
|
|
"api_configured": _api_configured(),
|
|
"subscription_id": cache.get("subscription_id") or "",
|
|
"plan": cache.get("plan") or "",
|
|
"expires_at": cache.get("expires_at") or "",
|
|
"expires_at_display": exp.strftime("%Y-%m-%d %H:%M") if exp else "",
|
|
"days_left": days_left,
|
|
"last_validated_at": cache.get("last_validated_at") or "",
|
|
"check_interval_days": check_interval_days(),
|
|
"offline_grace_days": offline_grace_days(),
|
|
"wechat_id": wechat_id(),
|
|
"wechat_remark": wechat_remark_hint(device_id),
|
|
"plans": [
|
|
{"id": "monthly", "name": "月卡", "price": 199, "days": 30},
|
|
{"id": "quarterly", "name": "季卡", "price": 399, "days": 90},
|
|
{"id": "yearly", "name": "年卡", "price": 699, "days": 365},
|
|
],
|
|
}
|
|
|
|
|
|
def _render_license_html(ctx: dict[str, Any]) -> str:
|
|
path = os.path.join(_TEMPLATE_DIR, "license.html")
|
|
with open(path, encoding="utf-8") as f:
|
|
template = f.read()
|
|
from jinja2 import Template
|
|
|
|
return Template(template).render(**ctx)
|
|
|
|
|
|
def init_flask_app(app, exchange_display: str = "交易系统") -> None:
|
|
"""注册 /license 路由与全局门禁。"""
|
|
load_shared_env()
|
|
|
|
@app.route("/license", methods=["GET", "POST"])
|
|
def license_page():
|
|
from flask import flash, get_flashed_messages, redirect, render_template_string, request, url_for
|
|
|
|
if request.method == "POST":
|
|
code = (request.form.get("activation_code") or "").strip()
|
|
if not code:
|
|
flash("请输入激活码")
|
|
else:
|
|
ok, msg = redeem_code(code)
|
|
if ok:
|
|
flash(msg or "激活成功", "ok")
|
|
return redirect(url_for("license_page"))
|
|
flash(msg or "激活失败")
|
|
return redirect(url_for("license_page"))
|
|
|
|
status = license_status()
|
|
html = _render_license_html(
|
|
{
|
|
**status,
|
|
"exchange_display": exchange_display,
|
|
"messages": get_flashed_messages(with_categories=True),
|
|
}
|
|
)
|
|
return render_template_string(html)
|
|
|
|
@app.before_request
|
|
def _license_gate():
|
|
from flask import redirect, request, url_for
|
|
|
|
if license_disabled():
|
|
return None
|
|
endpoint = request.endpoint or ""
|
|
if endpoint in ("license_page", "login", "logout") or endpoint.startswith("static"):
|
|
return None
|
|
if not is_licensed():
|
|
return redirect(url_for("license_page"))
|
|
return None
|
|
|
|
|
|
def init_fastapi_app(app) -> None:
|
|
from fastapi import Request
|
|
from fastapi.responses import HTMLResponse, RedirectResponse
|
|
|
|
load_shared_env()
|
|
|
|
@app.get("/license", response_class=HTMLResponse)
|
|
async def license_page(request: Request):
|
|
return HTMLResponse(_render_license_html({**license_status(), "exchange_display": "中控", "messages": []}))
|
|
|
|
@app.middleware("http")
|
|
async def _license_gate(request: Request, call_next):
|
|
if license_disabled():
|
|
return await call_next(request)
|
|
path = request.url.path
|
|
if path == "/license" or path.startswith("/assets"):
|
|
return await call_next(request)
|
|
if not is_licensed():
|
|
return RedirectResponse(url="/license", status_code=302)
|
|
return await call_next(request)
|