Files
2026-05-27 07:34:34 +08:00

281 lines
9.2 KiB
Python

"""加密存储、旧版 data.json 迁移"""
import json
import uuid
from pathlib import Path
from cryptography.fernet import Fernet, InvalidToken
from env_config import get_encryption_key
BASE_DIR = Path(__file__).resolve().parent
DATA_FILE = BASE_DIR / "data.json"
SETTINGS_FILE = BASE_DIR / "settings.json"
VALID_EXCHANGES = frozenset({"binance", "okx", "gate"})
BUILTIN_TYPES = [
{
"id": "exchange",
"label": "交易所 API",
"builtin": True,
"fields": [
{"key": "exchange", "label": "交易所", "type": "select", "options": ["binance", "okx", "gate"]},
{"key": "username", "label": "账户名称", "type": "text", "required": True},
{"key": "api_key", "label": "API Key", "type": "secret", "required": True},
{"key": "api_secret", "label": "API Secret", "type": "secret", "required": True},
{"key": "password", "label": "OKX 密码", "type": "secret", "required": True, "when": {"exchange": "okx"}},
],
},
{
"id": "website",
"label": "网站账号",
"builtin": True,
"fields": [
{"key": "title", "label": "名称", "type": "text", "required": True},
{"key": "url", "label": "网址", "type": "url", "required": True},
{"key": "username", "label": "用户名", "type": "text", "required": True},
{"key": "password", "label": "密码", "type": "secret", "required": True},
{"key": "note", "label": "备注", "type": "text"},
],
},
{
"id": "email",
"label": "邮箱账户",
"builtin": True,
"fields": [
{"key": "title", "label": "名称", "type": "text", "required": True},
{"key": "email", "label": "邮箱地址", "type": "email", "required": True},
{"key": "username", "label": "用户名", "type": "text"},
{"key": "password", "label": "密码", "type": "secret", "required": True},
{"key": "web_url", "label": "网页登录地址", "type": "url"},
{"key": "note", "label": "备注", "type": "text"},
],
},
{
"id": "wecom_bot",
"label": "企业微信机器人",
"builtin": True,
"fields": [
{"key": "title", "label": "名称", "type": "text", "required": True},
{"key": "webhook", "label": "Webhook", "type": "secret", "required": True},
{"key": "note", "label": "备注", "type": "text"},
],
},
{
"id": "dingtalk_bot",
"label": "钉钉机器人",
"builtin": True,
"fields": [
{"key": "title", "label": "名称", "type": "text", "required": True},
{"key": "webhook", "label": "Webhook", "type": "secret", "required": True},
{"key": "note", "label": "备注", "type": "text"},
],
},
{
"id": "wechat",
"label": "微信",
"builtin": True,
"fields": [
{"key": "title", "label": "名称", "type": "text", "required": True},
{"key": "wechat_id", "label": "微信号", "type": "text", "required": True},
{"key": "phone", "label": "手机号", "type": "phone", "required": True},
{"key": "password", "label": "密码", "type": "secret"},
{"key": "note", "label": "备注", "type": "text"},
],
},
{
"id": "qq",
"label": "QQ",
"builtin": True,
"fields": [
{"key": "title", "label": "名称", "type": "text", "required": True},
{"key": "qq_number", "label": "QQ号", "type": "text", "required": True},
{"key": "phone", "label": "手机号", "type": "phone", "required": True},
{"key": "password", "label": "密码", "type": "secret"},
{"key": "note", "label": "备注", "type": "text"},
],
},
]
def _fernet():
return Fernet(get_encryption_key())
def default_settings():
return {"custom_types": []}
def load_settings():
if not SETTINGS_FILE.exists():
s = default_settings()
save_settings(s)
return s
try:
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
if "custom_types" not in data:
data["custom_types"] = []
return data
except (json.JSONDecodeError, OSError):
return default_settings()
def save_settings(data):
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def all_type_defs():
settings = load_settings()
return BUILTIN_TYPES + settings.get("custom_types", [])
def get_type_def(type_id: str):
for t in all_type_defs():
if t["id"] == type_id:
return t
return None
def _legacy_to_record(item: dict) -> dict:
exchange = (item.get("exchange") or "binance").lower()
fields = {
"exchange": exchange,
"username": item.get("username", ""),
"api_key": item.get("api_key", ""),
"api_secret": item.get("api_secret", ""),
}
if item.get("password"):
fields["password"] = item["password"]
title = fields.get("username") or exchange
return {
"id": item.get("id") or str(uuid.uuid4()),
"type_id": "exchange",
"title": title,
"fields": fields,
}
def _read_raw_file():
if not DATA_FILE.exists():
return None
with open(DATA_FILE, "r", encoding="utf-8") as f:
return json.load(f)
def _migrate_plain_list(items: list) -> list:
records = []
for item in items:
if isinstance(item, dict) and item.get("type_id"):
records.append(item)
elif isinstance(item, dict) and item.get("exchange"):
records.append(_legacy_to_record(item))
elif isinstance(item, dict):
records.append(item)
return records
def load_records():
raw = _read_raw_file()
if raw is None:
return []
if isinstance(raw, list):
records = _migrate_plain_list(raw)
save_records(records)
return records
if isinstance(raw, dict) and raw.get("v") == 2:
token = raw.get("payload", "")
if not token:
return []
try:
plain = _fernet().decrypt(token.encode()).decode()
data = json.loads(plain)
return data if isinstance(data, list) else []
except (InvalidToken, json.JSONDecodeError):
return []
return []
def save_records(records: list):
plain = json.dumps(records, ensure_ascii=False).encode()
token = _fernet().encrypt(plain).decode()
payload = {"v": 2, "payload": token}
with open(DATA_FILE, "w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
def record_title(type_id: str, fields: dict) -> str:
for key in ("title", "username", "email", "wechat_id", "qq_number"):
if fields.get(key):
return str(fields[key])
if type_id == "exchange":
ex = fields.get("exchange", "")
un = fields.get("username", "")
return f"{ex} {un}".strip() or "未命名"
return "未命名"
def _field_applies(fd: dict, fields_in: dict) -> bool:
when = fd.get("when")
if not when:
return True
return all(str(fields_in.get(k, "")).strip() == str(v) for k, v in when.items())
def validate_record_payload(payload: dict):
type_id = (payload.get("type_id") or "").strip()
type_def = get_type_def(type_id)
if not type_def:
return None, "无效的凭证类型"
fields_in = payload.get("fields") or {}
if not isinstance(fields_in, dict):
return None, "字段格式错误"
fields = {}
for fd in type_def.get("fields", []):
if not _field_applies(fd, fields_in):
continue
key = fd["key"]
val = fields_in.get(key)
if val is None:
val = ""
val = str(val).strip() if not isinstance(val, str) else val.strip()
if fd.get("required") and not val:
return None, f"{fd.get('label', key)} 不能为空"
fields[key] = val
if type_id == "exchange":
ex = fields.get("exchange", "").lower()
if ex not in VALID_EXCHANGES:
return None, "无效交易所"
for req in ("username", "api_key", "api_secret"):
if not fields.get(req):
return None, f"{req} 不能为空"
if ex == "okx" and not fields.get("password"):
return None, "OKX 密码不能为空"
return {
"type_id": type_id,
"title": record_title(type_id, fields),
"fields": fields,
}, None
def filter_records(records, type_id=None, q=None):
out = records
if type_id:
out = [r for r in out if r.get("type_id") == type_id]
if q:
q = q.lower()
filtered = []
for r in out:
blob = json.dumps(r, ensure_ascii=False).lower()
if q in blob:
filtered.append(r)
out = filtered
return out