"""加密存储、旧版 data.json 迁移""" import json import uuid from pathlib import Path from cryptography.fernet import Fernet, InvalidToken from env_config import get_encryption_key BASE_DIR = Path(__file__).resolve().parent DATA_FILE = BASE_DIR / "data.json" SETTINGS_FILE = BASE_DIR / "settings.json" VALID_EXCHANGES = frozenset({"binance", "okx", "gate"}) BUILTIN_TYPES = [ { "id": "exchange", "label": "交易所 API", "builtin": True, "fields": [ {"key": "exchange", "label": "交易所", "type": "select", "options": ["binance", "okx", "gate"]}, {"key": "username", "label": "账户名称", "type": "text", "required": True}, {"key": "api_key", "label": "API Key", "type": "secret", "required": True}, {"key": "api_secret", "label": "API Secret", "type": "secret", "required": True}, {"key": "password", "label": "OKX 密码", "type": "secret", "required": True, "when": {"exchange": "okx"}}, ], }, { "id": "website", "label": "网站账号", "builtin": True, "fields": [ {"key": "title", "label": "名称", "type": "text", "required": True}, {"key": "url", "label": "网址", "type": "url", "required": True}, {"key": "username", "label": "用户名", "type": "text", "required": True}, {"key": "password", "label": "密码", "type": "secret", "required": True}, {"key": "note", "label": "备注", "type": "text"}, ], }, { "id": "email", "label": "邮箱账户", "builtin": True, "fields": [ {"key": "title", "label": "名称", "type": "text", "required": True}, {"key": "email", "label": "邮箱地址", "type": "email", "required": True}, {"key": "username", "label": "用户名", "type": "text"}, {"key": "password", "label": "密码", "type": "secret", "required": True}, {"key": "web_url", "label": "网页登录地址", "type": "url"}, {"key": "note", "label": "备注", "type": "text"}, ], }, { "id": "wecom_bot", "label": "企业微信机器人", "builtin": True, "fields": [ {"key": "title", "label": "名称", "type": "text", "required": True}, {"key": "webhook", "label": "Webhook", "type": "secret", "required": True}, {"key": "note", "label": "备注", "type": "text"}, ], }, { "id": "dingtalk_bot", "label": "钉钉机器人", "builtin": True, "fields": [ {"key": "title", "label": "名称", "type": "text", "required": True}, {"key": "webhook", "label": "Webhook", "type": "secret", "required": True}, {"key": "note", "label": "备注", "type": "text"}, ], }, { "id": "wechat", "label": "微信", "builtin": True, "fields": [ {"key": "title", "label": "名称", "type": "text", "required": True}, {"key": "wechat_id", "label": "微信号", "type": "text", "required": True}, {"key": "phone", "label": "手机号", "type": "phone", "required": True}, {"key": "password", "label": "密码", "type": "secret"}, {"key": "note", "label": "备注", "type": "text"}, ], }, { "id": "qq", "label": "QQ", "builtin": True, "fields": [ {"key": "title", "label": "名称", "type": "text", "required": True}, {"key": "qq_number", "label": "QQ号", "type": "text", "required": True}, {"key": "phone", "label": "手机号", "type": "phone", "required": True}, {"key": "password", "label": "密码", "type": "secret"}, {"key": "note", "label": "备注", "type": "text"}, ], }, ] def _fernet(): return Fernet(get_encryption_key()) def default_settings(): return {"custom_types": []} def load_settings(): if not SETTINGS_FILE.exists(): s = default_settings() save_settings(s) return s try: with open(SETTINGS_FILE, "r", encoding="utf-8") as f: data = json.load(f) if "custom_types" not in data: data["custom_types"] = [] return data except (json.JSONDecodeError, OSError): return default_settings() def save_settings(data): with open(SETTINGS_FILE, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) def all_type_defs(): settings = load_settings() return BUILTIN_TYPES + settings.get("custom_types", []) def get_type_def(type_id: str): for t in all_type_defs(): if t["id"] == type_id: return t return None def _legacy_to_record(item: dict) -> dict: exchange = (item.get("exchange") or "binance").lower() fields = { "exchange": exchange, "username": item.get("username", ""), "api_key": item.get("api_key", ""), "api_secret": item.get("api_secret", ""), } if item.get("password"): fields["password"] = item["password"] title = fields.get("username") or exchange return { "id": item.get("id") or str(uuid.uuid4()), "type_id": "exchange", "title": title, "fields": fields, } def _read_raw_file(): if not DATA_FILE.exists(): return None with open(DATA_FILE, "r", encoding="utf-8") as f: return json.load(f) def _migrate_plain_list(items: list) -> list: records = [] for item in items: if isinstance(item, dict) and item.get("type_id"): records.append(item) elif isinstance(item, dict) and item.get("exchange"): records.append(_legacy_to_record(item)) elif isinstance(item, dict): records.append(item) return records def load_records(): raw = _read_raw_file() if raw is None: return [] if isinstance(raw, list): records = _migrate_plain_list(raw) save_records(records) return records if isinstance(raw, dict) and raw.get("v") == 2: token = raw.get("payload", "") if not token: return [] try: plain = _fernet().decrypt(token.encode()).decode() data = json.loads(plain) return data if isinstance(data, list) else [] except (InvalidToken, json.JSONDecodeError): return [] return [] def save_records(records: list): plain = json.dumps(records, ensure_ascii=False).encode() token = _fernet().encrypt(plain).decode() payload = {"v": 2, "payload": token} with open(DATA_FILE, "w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) def record_title(type_id: str, fields: dict) -> str: for key in ("title", "username", "email", "wechat_id", "qq_number"): if fields.get(key): return str(fields[key]) if type_id == "exchange": ex = fields.get("exchange", "") un = fields.get("username", "") return f"{ex} {un}".strip() or "未命名" return "未命名" def _field_applies(fd: dict, fields_in: dict) -> bool: when = fd.get("when") if not when: return True return all(str(fields_in.get(k, "")).strip() == str(v) for k, v in when.items()) def validate_record_payload(payload: dict): type_id = (payload.get("type_id") or "").strip() type_def = get_type_def(type_id) if not type_def: return None, "无效的凭证类型" fields_in = payload.get("fields") or {} if not isinstance(fields_in, dict): return None, "字段格式错误" fields = {} for fd in type_def.get("fields", []): if not _field_applies(fd, fields_in): continue key = fd["key"] val = fields_in.get(key) if val is None: val = "" val = str(val).strip() if not isinstance(val, str) else val.strip() if fd.get("required") and not val: return None, f"{fd.get('label', key)} 不能为空" fields[key] = val if type_id == "exchange": ex = fields.get("exchange", "").lower() if ex not in VALID_EXCHANGES: return None, "无效交易所" for req in ("username", "api_key", "api_secret"): if not fields.get(req): return None, f"{req} 不能为空" if ex == "okx" and not fields.get("password"): return None, "OKX 密码不能为空" return { "type_id": type_id, "title": record_title(type_id, fields), "fields": fields, }, None def filter_records(records, type_id=None, q=None): out = records if type_id: out = [r for r in out if r.get("type_id") == type_id] if q: q = q.lower() filtered = [] for r in out: blob = json.dumps(r, ensure_ascii=False).lower() if q in blob: filtered.append(r) out = filtered return out