增加多账户
This commit is contained in:
@@ -0,0 +1,5 @@
|
||||
# 复制为 .env 后修改(勿提交 .env 到 Git)
|
||||
SECRET_KEY=请替换为随机长字符串
|
||||
AUTH_USERNAME=admin
|
||||
AUTH_PASSWORD=请设置强密码
|
||||
DATA_ENCRYPTION_KEY=请运行 python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"
|
||||
@@ -1,4 +1,6 @@
|
||||
venv/
|
||||
logs/*.log
|
||||
data.json
|
||||
settings.json
|
||||
.env
|
||||
__pycache__/
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
# crypto_key — 币圈 API 密钥管理工具
|
||||
# crypto_key — 凭证保险库
|
||||
|
||||
本地 Web 工具,管理多个交易所/平台 API 账户(账户名称、API Key、API Secret)。数据保存在本地 `data.json`,无登录。
|
||||
本地 Web 工具,管理交易所 API、网站、邮箱、机器人、微信/QQ 及自定义类型凭证。数据 **AES 加密** 存入 `data.json`,登录凭据与加密主密钥在 `.env`。
|
||||
|
||||
| 项目 | 说明 |
|
||||
|------|------|
|
||||
@@ -13,15 +13,28 @@
|
||||
|
||||
## 功能
|
||||
|
||||
- 交易所:`binance` / `okx` / `gate`(OKX 额外保存密码 Passphrase)
|
||||
- 账户数量不限,每项含 `exchange` / `username` / `api_key` / `api_secret`
|
||||
- 添加后默认不展示列表,需选择交易所点击「确认」查询
|
||||
- **登录**:用户名 + 密码(`.env`),浏览器不缓存明文
|
||||
- **内置类型**:交易所 API、网站、邮箱、企业微信/钉钉机器人、微信、QQ
|
||||
- **自定义类型**:系统设置中添加(如小红书、抖音、快手)
|
||||
- **查询**:类型搜索 + 关键词 + 确认后显示;网站/邮箱可点击跳转
|
||||
- **旧 data.json**:首次启动自动迁移为加密格式
|
||||
- 黑色专业界面,列表展示 + 每项 3 个复制按钮(**复制始终为明文**)
|
||||
- 可选界面打码显示,不影响复制内容
|
||||
- 数据持久化至 `data.json`
|
||||
|
||||
---
|
||||
|
||||
## 首次配置
|
||||
|
||||
```bash
|
||||
cp .env.example .env
|
||||
# 编辑 .env:SECRET_KEY、AUTH_USERNAME、AUTH_PASSWORD、DATA_ENCRYPTION_KEY
|
||||
pip install -r requirements.txt
|
||||
python app.py
|
||||
```
|
||||
|
||||
默认首次运行会自动生成 `.env`(用户名 `admin` / 密码 `admin123`,请立即在「系统设置」中修改)。
|
||||
|
||||
## Ubuntu 服务器部署(/opt)
|
||||
|
||||
完整步骤见 **[DEPLOY.md](./DEPLOY.md)**。
|
||||
@@ -29,6 +42,7 @@
|
||||
```bash
|
||||
sudo git clone https://git.bz121.com/dekun/crypto_key.git /opt/crypto_key
|
||||
cd /opt/crypto_key
|
||||
cp .env.example .env # 并编辑
|
||||
sudo bash scripts/install-ubuntu.sh
|
||||
```
|
||||
|
||||
|
||||
@@ -1,67 +1,59 @@
|
||||
"""
|
||||
多账户 API 密钥管理工具 — Flask 后端
|
||||
端口: 5200 (0.0.0.0 局域网可访问) | 数据: data.json
|
||||
凭证保险库 — Flask 后端
|
||||
登录凭据与加密主密钥在 .env | 数据加密存 data.json
|
||||
"""
|
||||
import json
|
||||
import re
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
from functools import wraps
|
||||
|
||||
from flask import Flask, jsonify, request, send_from_directory
|
||||
from flask import Flask, jsonify, request, send_from_directory, session
|
||||
|
||||
import env_config
|
||||
from storage import (
|
||||
BUILTIN_TYPES,
|
||||
all_type_defs,
|
||||
filter_records,
|
||||
get_type_def,
|
||||
load_records,
|
||||
load_settings,
|
||||
save_records,
|
||||
save_settings,
|
||||
validate_record_payload,
|
||||
)
|
||||
|
||||
env_config.ensure_env()
|
||||
|
||||
app = Flask(__name__)
|
||||
app.secret_key = env_config.get_secret_key()
|
||||
app.config.update(
|
||||
SESSION_COOKIE_HTTPONLY=True,
|
||||
SESSION_COOKIE_SAMESITE="Lax",
|
||||
PERMANENT_SESSION_LIFETIME=1800,
|
||||
)
|
||||
|
||||
BASE_DIR = Path(__file__).resolve().parent
|
||||
DATA_FILE = BASE_DIR / "data.json"
|
||||
VALID_EXCHANGES = frozenset({"binance", "okx", "gate"})
|
||||
BASE_DIR = env_config.BASE_DIR
|
||||
PUBLIC_API = {"/api/auth/login", "/api/auth/status"}
|
||||
|
||||
|
||||
def load_accounts():
|
||||
if not DATA_FILE.exists():
|
||||
return []
|
||||
try:
|
||||
with open(DATA_FILE, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
if isinstance(data, list):
|
||||
return data
|
||||
return []
|
||||
except (json.JSONDecodeError, OSError):
|
||||
return []
|
||||
def login_required(fn):
|
||||
@wraps(fn)
|
||||
def wrapper(*args, **kwargs):
|
||||
if not session.get("authenticated"):
|
||||
return jsonify({"error": "未登录"}), 401
|
||||
return fn(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def save_accounts(accounts):
|
||||
with open(DATA_FILE, "w", encoding="utf-8") as f:
|
||||
json.dump(accounts, f, ensure_ascii=False, indent=2)
|
||||
|
||||
|
||||
def validate_account_payload(payload):
|
||||
exchange = (payload.get("exchange") or "").strip().lower()
|
||||
username = (payload.get("username") or "").strip()
|
||||
api_key = (payload.get("api_key") or "").strip()
|
||||
api_secret = (payload.get("api_secret") or "").strip()
|
||||
|
||||
if exchange not in VALID_EXCHANGES:
|
||||
return None, "请选择交易所:binance / okx / gate"
|
||||
if not username:
|
||||
return None, "账户名称不能为空"
|
||||
if not api_key:
|
||||
return None, "API Key 不能为空"
|
||||
if not api_secret:
|
||||
return None, "API Secret 不能为空"
|
||||
|
||||
account = {
|
||||
"exchange": exchange,
|
||||
"username": username,
|
||||
"api_key": api_key,
|
||||
"api_secret": api_secret,
|
||||
}
|
||||
|
||||
if exchange == "okx":
|
||||
password = (payload.get("password") or "").strip()
|
||||
if not password:
|
||||
return None, "OKX 密码(Passphrase)不能为空"
|
||||
account["password"] = password
|
||||
|
||||
return account, None
|
||||
@app.before_request
|
||||
def guard_api():
|
||||
if not request.path.startswith("/api/"):
|
||||
return None
|
||||
if request.path in PUBLIC_API:
|
||||
return None
|
||||
if not session.get("authenticated"):
|
||||
return jsonify({"error": "未登录"}), 401
|
||||
return None
|
||||
|
||||
|
||||
@app.route("/")
|
||||
@@ -69,39 +61,165 @@ def index():
|
||||
return send_from_directory(BASE_DIR, "index.html")
|
||||
|
||||
|
||||
@app.route("/api/accounts", methods=["GET"])
|
||||
def list_accounts():
|
||||
accounts = load_accounts()
|
||||
exchange = (request.args.get("exchange") or "").strip().lower()
|
||||
if exchange:
|
||||
if exchange not in VALID_EXCHANGES:
|
||||
return jsonify({"error": "无效交易所"}), 400
|
||||
accounts = [a for a in accounts if a.get("exchange") == exchange]
|
||||
return jsonify(accounts)
|
||||
@app.route("/api/auth/status", methods=["GET"])
|
||||
def auth_status():
|
||||
return jsonify({"logged_in": bool(session.get("authenticated"))})
|
||||
|
||||
|
||||
@app.route("/api/accounts", methods=["POST"])
|
||||
def create_account():
|
||||
@app.route("/api/auth/login", methods=["POST"])
|
||||
def auth_login():
|
||||
body = request.get_json(silent=True) or {}
|
||||
account, err = validate_account_payload(body)
|
||||
username = (body.get("username") or "").strip()
|
||||
password = body.get("password") or ""
|
||||
if username != env_config.get_auth_username() or not env_config.verify_password(password):
|
||||
return jsonify({"error": "用户名或密码错误"}), 401
|
||||
session["authenticated"] = True
|
||||
session.permanent = True
|
||||
return jsonify({"ok": True})
|
||||
|
||||
|
||||
@app.route("/api/auth/logout", methods=["POST"])
|
||||
@login_required
|
||||
def auth_logout():
|
||||
session.clear()
|
||||
return jsonify({"ok": True})
|
||||
|
||||
|
||||
@app.route("/api/settings", methods=["GET"])
|
||||
@login_required
|
||||
def get_settings():
|
||||
s = load_settings()
|
||||
return jsonify(
|
||||
{
|
||||
"builtin_types": BUILTIN_TYPES,
|
||||
"custom_types": s.get("custom_types", []),
|
||||
"username": env_config.get_auth_username(),
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@app.route("/api/settings/auth", methods=["PUT"])
|
||||
@login_required
|
||||
def update_settings_auth():
|
||||
body = request.get_json(silent=True) or {}
|
||||
current = body.get("current_password") or ""
|
||||
if not env_config.verify_password(current):
|
||||
return jsonify({"error": "当前密码错误"}), 400
|
||||
username = (body.get("username") or "").strip()
|
||||
new_pass = body.get("new_password") or ""
|
||||
if not username:
|
||||
return jsonify({"error": "用户名不能为空"}), 400
|
||||
if len(new_pass) < 6:
|
||||
return jsonify({"error": "新密码至少 6 位"}), 400
|
||||
env_config.update_auth(username, new_pass)
|
||||
return jsonify({"ok": True, "username": username})
|
||||
|
||||
|
||||
@app.route("/api/settings/types", methods=["POST"])
|
||||
@login_required
|
||||
def add_custom_type():
|
||||
body = request.get_json(silent=True) or {}
|
||||
type_id = (body.get("id") or "").strip().lower()
|
||||
label = (body.get("label") or "").strip()
|
||||
fields = body.get("fields") or []
|
||||
if not re.match(r"^[a-z][a-z0-9_]{1,31}$", type_id):
|
||||
return jsonify({"error": "类型 ID 需为小写字母开头,仅含 a-z0-9_"}), 400
|
||||
if not label:
|
||||
return jsonify({"error": "显示名称不能为空"}), 400
|
||||
if get_type_def(type_id):
|
||||
return jsonify({"error": "类型 ID 已存在"}), 400
|
||||
if not fields:
|
||||
return jsonify({"error": "至少添加一个字段"}), 400
|
||||
norm_fields = []
|
||||
for f in fields:
|
||||
key = (f.get("key") or "").strip().lower()
|
||||
if not re.match(r"^[a-z][a-z0-9_]{0,31}$", key):
|
||||
return jsonify({"error": f"无效字段 key: {key}"}), 400
|
||||
norm_fields.append(
|
||||
{
|
||||
"key": key,
|
||||
"label": (f.get("label") or key).strip(),
|
||||
"type": f.get("type") or "text",
|
||||
"required": bool(f.get("required")),
|
||||
}
|
||||
)
|
||||
s = load_settings()
|
||||
entry = {"id": type_id, "label": label, "builtin": False, "fields": norm_fields}
|
||||
s["custom_types"].append(entry)
|
||||
save_settings(s)
|
||||
return jsonify(entry), 201
|
||||
|
||||
|
||||
@app.route("/api/settings/types/<type_id>", methods=["DELETE"])
|
||||
@login_required
|
||||
def delete_custom_type(type_id):
|
||||
s = load_settings()
|
||||
before = len(s["custom_types"])
|
||||
s["custom_types"] = [t for t in s["custom_types"] if t["id"] != type_id]
|
||||
if len(s["custom_types"]) == before:
|
||||
return jsonify({"error": "自定义类型不存在"}), 404
|
||||
save_settings(s)
|
||||
records = [r for r in load_records() if r.get("type_id") != type_id]
|
||||
save_records(records)
|
||||
return jsonify({"ok": True})
|
||||
|
||||
|
||||
@app.route("/api/credentials", methods=["GET"])
|
||||
@login_required
|
||||
def list_credentials():
|
||||
records = load_records()
|
||||
type_id = (request.args.get("type_id") or "").strip()
|
||||
q = (request.args.get("q") or "").strip()
|
||||
if type_id == "exchange" and request.args.get("exchange"):
|
||||
ex = request.args.get("exchange").strip().lower()
|
||||
records = [
|
||||
r
|
||||
for r in records
|
||||
if r.get("type_id") == "exchange" and r.get("fields", {}).get("exchange") == ex
|
||||
]
|
||||
else:
|
||||
records = filter_records(records, type_id=type_id or None, q=q or None)
|
||||
return jsonify(records)
|
||||
|
||||
|
||||
@app.route("/api/credentials", methods=["POST"])
|
||||
@login_required
|
||||
def create_credential():
|
||||
body = request.get_json(silent=True) or {}
|
||||
record, err = validate_record_payload(body)
|
||||
if err:
|
||||
return jsonify({"error": err}), 400
|
||||
record["id"] = str(uuid.uuid4())
|
||||
records = load_records()
|
||||
records.append(record)
|
||||
save_records(records)
|
||||
return jsonify(record), 201
|
||||
|
||||
accounts = load_accounts()
|
||||
account["id"] = str(uuid.uuid4())
|
||||
accounts.append(account)
|
||||
save_accounts(accounts)
|
||||
return jsonify(account), 201
|
||||
|
||||
@app.route("/api/credentials/<record_id>", methods=["DELETE"])
|
||||
@login_required
|
||||
def delete_credential(record_id):
|
||||
records = load_records()
|
||||
new_records = [r for r in records if r.get("id") != record_id]
|
||||
if len(new_records) == len(records):
|
||||
return jsonify({"error": "记录不存在"}), 404
|
||||
save_records(new_records)
|
||||
return jsonify({"ok": True})
|
||||
|
||||
|
||||
# 兼容旧前端路径
|
||||
@app.route("/api/accounts", methods=["GET", "POST"])
|
||||
@login_required
|
||||
def accounts_compat():
|
||||
if request.method == "GET":
|
||||
return list_credentials()
|
||||
return create_credential()
|
||||
|
||||
|
||||
@app.route("/api/accounts/<account_id>", methods=["DELETE"])
|
||||
def delete_account(account_id):
|
||||
accounts = load_accounts()
|
||||
new_accounts = [a for a in accounts if a.get("id") != account_id]
|
||||
if len(new_accounts) == len(accounts):
|
||||
return jsonify({"error": "账户不存在"}), 404
|
||||
save_accounts(new_accounts)
|
||||
return jsonify({"ok": True})
|
||||
@login_required
|
||||
def accounts_delete_compat(account_id):
|
||||
return delete_credential(account_id)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -0,0 +1,81 @@
|
||||
"""读取与更新 .env 配置"""
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
from cryptography.fernet import Fernet
|
||||
from dotenv import load_dotenv, set_key
|
||||
from werkzeug.security import check_password_hash, generate_password_hash
|
||||
|
||||
BASE_DIR = Path(__file__).resolve().parent
|
||||
ENV_FILE = BASE_DIR / ".env"
|
||||
|
||||
|
||||
def _valid_fernet_key(value: str) -> bool:
|
||||
if not value or "请" in value or len(value) < 40:
|
||||
return False
|
||||
try:
|
||||
Fernet(value.encode())
|
||||
return True
|
||||
except (ValueError, Exception):
|
||||
return False
|
||||
|
||||
|
||||
def ensure_env():
|
||||
if not ENV_FILE.exists():
|
||||
example = BASE_DIR / ".env.example"
|
||||
if example.exists():
|
||||
ENV_FILE.write_text(example.read_text(encoding="utf-8"), encoding="utf-8")
|
||||
else:
|
||||
ENV_FILE.write_text("", encoding="utf-8")
|
||||
load_dotenv(ENV_FILE, override=True)
|
||||
changed = False
|
||||
if not os.getenv("SECRET_KEY") or "请" in os.getenv("SECRET_KEY", ""):
|
||||
set_key(str(ENV_FILE), "SECRET_KEY", os.urandom(32).hex())
|
||||
changed = True
|
||||
if not _valid_fernet_key(os.getenv("DATA_ENCRYPTION_KEY", "")):
|
||||
set_key(str(ENV_FILE), "DATA_ENCRYPTION_KEY", Fernet.generate_key().decode())
|
||||
changed = True
|
||||
if not os.getenv("AUTH_USERNAME"):
|
||||
set_key(str(ENV_FILE), "AUTH_USERNAME", "admin")
|
||||
changed = True
|
||||
if not os.getenv("AUTH_PASSWORD"):
|
||||
set_key(str(ENV_FILE), "AUTH_PASSWORD", generate_password_hash("admin123"))
|
||||
changed = True
|
||||
if changed:
|
||||
load_dotenv(ENV_FILE, override=True)
|
||||
|
||||
|
||||
def reload_env():
|
||||
load_dotenv(ENV_FILE, override=True)
|
||||
|
||||
|
||||
def get_secret_key():
|
||||
return os.getenv("SECRET_KEY", "dev-insecure-key")
|
||||
|
||||
|
||||
def get_encryption_key():
|
||||
raw = os.getenv("DATA_ENCRYPTION_KEY", "")
|
||||
if not raw:
|
||||
raise ValueError("缺少 DATA_ENCRYPTION_KEY,请检查 .env")
|
||||
return raw.encode() if isinstance(raw, str) else raw
|
||||
|
||||
|
||||
def get_auth_username():
|
||||
return os.getenv("AUTH_USERNAME", "admin")
|
||||
|
||||
|
||||
def verify_password(plain: str) -> bool:
|
||||
stored = os.getenv("AUTH_PASSWORD", "")
|
||||
if not stored:
|
||||
return False
|
||||
if stored.startswith("pbkdf2:") or stored.startswith("scrypt:"):
|
||||
return check_password_hash(stored, plain)
|
||||
return stored == plain
|
||||
|
||||
|
||||
def update_auth(username: str, password: str):
|
||||
ensure_env()
|
||||
set_key(str(ENV_FILE), "AUTH_USERNAME", username)
|
||||
set_key(str(ENV_FILE), "AUTH_PASSWORD", generate_password_hash(password))
|
||||
reload_env()
|
||||
+554
-456
File diff suppressed because it is too large
Load Diff
@@ -1 +1,4 @@
|
||||
flask>=3.0.0,<4.0.0
|
||||
python-dotenv>=1.0.0,<2.0.0
|
||||
cryptography>=42.0.0,<44.0.0
|
||||
werkzeug>=3.0.0,<4.0.0
|
||||
|
||||
+280
@@ -0,0 +1,280 @@
|
||||
"""加密存储、旧版 data.json 迁移"""
|
||||
import json
|
||||
import uuid
|
||||
from pathlib import Path
|
||||
|
||||
from cryptography.fernet import Fernet, InvalidToken
|
||||
|
||||
from env_config import get_encryption_key
|
||||
|
||||
BASE_DIR = Path(__file__).resolve().parent
|
||||
DATA_FILE = BASE_DIR / "data.json"
|
||||
SETTINGS_FILE = BASE_DIR / "settings.json"
|
||||
VALID_EXCHANGES = frozenset({"binance", "okx", "gate"})
|
||||
|
||||
BUILTIN_TYPES = [
|
||||
{
|
||||
"id": "exchange",
|
||||
"label": "交易所 API",
|
||||
"builtin": True,
|
||||
"fields": [
|
||||
{"key": "exchange", "label": "交易所", "type": "select", "options": ["binance", "okx", "gate"]},
|
||||
{"key": "username", "label": "账户名称", "type": "text", "required": True},
|
||||
{"key": "api_key", "label": "API Key", "type": "secret", "required": True},
|
||||
{"key": "api_secret", "label": "API Secret", "type": "secret", "required": True},
|
||||
{"key": "password", "label": "OKX 密码", "type": "secret", "required": True, "when": {"exchange": "okx"}},
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "website",
|
||||
"label": "网站账号",
|
||||
"builtin": True,
|
||||
"fields": [
|
||||
{"key": "title", "label": "名称", "type": "text", "required": True},
|
||||
{"key": "url", "label": "网址", "type": "url", "required": True},
|
||||
{"key": "username", "label": "用户名", "type": "text", "required": True},
|
||||
{"key": "password", "label": "密码", "type": "secret", "required": True},
|
||||
{"key": "note", "label": "备注", "type": "text"},
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "email",
|
||||
"label": "邮箱账户",
|
||||
"builtin": True,
|
||||
"fields": [
|
||||
{"key": "title", "label": "名称", "type": "text", "required": True},
|
||||
{"key": "email", "label": "邮箱地址", "type": "email", "required": True},
|
||||
{"key": "username", "label": "用户名", "type": "text"},
|
||||
{"key": "password", "label": "密码", "type": "secret", "required": True},
|
||||
{"key": "web_url", "label": "网页登录地址", "type": "url"},
|
||||
{"key": "note", "label": "备注", "type": "text"},
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "wecom_bot",
|
||||
"label": "企业微信机器人",
|
||||
"builtin": True,
|
||||
"fields": [
|
||||
{"key": "title", "label": "名称", "type": "text", "required": True},
|
||||
{"key": "webhook", "label": "Webhook", "type": "secret", "required": True},
|
||||
{"key": "note", "label": "备注", "type": "text"},
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "dingtalk_bot",
|
||||
"label": "钉钉机器人",
|
||||
"builtin": True,
|
||||
"fields": [
|
||||
{"key": "title", "label": "名称", "type": "text", "required": True},
|
||||
{"key": "webhook", "label": "Webhook", "type": "secret", "required": True},
|
||||
{"key": "note", "label": "备注", "type": "text"},
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "wechat",
|
||||
"label": "微信",
|
||||
"builtin": True,
|
||||
"fields": [
|
||||
{"key": "title", "label": "名称", "type": "text", "required": True},
|
||||
{"key": "wechat_id", "label": "微信号", "type": "text", "required": True},
|
||||
{"key": "phone", "label": "手机号", "type": "phone", "required": True},
|
||||
{"key": "password", "label": "密码", "type": "secret"},
|
||||
{"key": "note", "label": "备注", "type": "text"},
|
||||
],
|
||||
},
|
||||
{
|
||||
"id": "qq",
|
||||
"label": "QQ",
|
||||
"builtin": True,
|
||||
"fields": [
|
||||
{"key": "title", "label": "名称", "type": "text", "required": True},
|
||||
{"key": "qq_number", "label": "QQ号", "type": "text", "required": True},
|
||||
{"key": "phone", "label": "手机号", "type": "phone", "required": True},
|
||||
{"key": "password", "label": "密码", "type": "secret"},
|
||||
{"key": "note", "label": "备注", "type": "text"},
|
||||
],
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
def _fernet():
|
||||
return Fernet(get_encryption_key())
|
||||
|
||||
|
||||
def default_settings():
|
||||
return {"custom_types": []}
|
||||
|
||||
|
||||
def load_settings():
|
||||
if not SETTINGS_FILE.exists():
|
||||
s = default_settings()
|
||||
save_settings(s)
|
||||
return s
|
||||
try:
|
||||
with open(SETTINGS_FILE, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
if "custom_types" not in data:
|
||||
data["custom_types"] = []
|
||||
return data
|
||||
except (json.JSONDecodeError, OSError):
|
||||
return default_settings()
|
||||
|
||||
|
||||
def save_settings(data):
|
||||
with open(SETTINGS_FILE, "w", encoding="utf-8") as f:
|
||||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||||
|
||||
|
||||
def all_type_defs():
|
||||
settings = load_settings()
|
||||
return BUILTIN_TYPES + settings.get("custom_types", [])
|
||||
|
||||
|
||||
def get_type_def(type_id: str):
|
||||
for t in all_type_defs():
|
||||
if t["id"] == type_id:
|
||||
return t
|
||||
return None
|
||||
|
||||
|
||||
def _legacy_to_record(item: dict) -> dict:
|
||||
exchange = (item.get("exchange") or "binance").lower()
|
||||
fields = {
|
||||
"exchange": exchange,
|
||||
"username": item.get("username", ""),
|
||||
"api_key": item.get("api_key", ""),
|
||||
"api_secret": item.get("api_secret", ""),
|
||||
}
|
||||
if item.get("password"):
|
||||
fields["password"] = item["password"]
|
||||
title = fields.get("username") or exchange
|
||||
return {
|
||||
"id": item.get("id") or str(uuid.uuid4()),
|
||||
"type_id": "exchange",
|
||||
"title": title,
|
||||
"fields": fields,
|
||||
}
|
||||
|
||||
|
||||
def _read_raw_file():
|
||||
if not DATA_FILE.exists():
|
||||
return None
|
||||
with open(DATA_FILE, "r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def _migrate_plain_list(items: list) -> list:
|
||||
records = []
|
||||
for item in items:
|
||||
if isinstance(item, dict) and item.get("type_id"):
|
||||
records.append(item)
|
||||
elif isinstance(item, dict) and item.get("exchange"):
|
||||
records.append(_legacy_to_record(item))
|
||||
elif isinstance(item, dict):
|
||||
records.append(item)
|
||||
return records
|
||||
|
||||
|
||||
def load_records():
|
||||
raw = _read_raw_file()
|
||||
if raw is None:
|
||||
return []
|
||||
|
||||
if isinstance(raw, list):
|
||||
records = _migrate_plain_list(raw)
|
||||
save_records(records)
|
||||
return records
|
||||
|
||||
if isinstance(raw, dict) and raw.get("v") == 2:
|
||||
token = raw.get("payload", "")
|
||||
if not token:
|
||||
return []
|
||||
try:
|
||||
plain = _fernet().decrypt(token.encode()).decode()
|
||||
data = json.loads(plain)
|
||||
return data if isinstance(data, list) else []
|
||||
except (InvalidToken, json.JSONDecodeError):
|
||||
return []
|
||||
|
||||
return []
|
||||
|
||||
|
||||
def save_records(records: list):
|
||||
plain = json.dumps(records, ensure_ascii=False).encode()
|
||||
token = _fernet().encrypt(plain).decode()
|
||||
payload = {"v": 2, "payload": token}
|
||||
with open(DATA_FILE, "w", encoding="utf-8") as f:
|
||||
json.dump(payload, f, ensure_ascii=False, indent=2)
|
||||
|
||||
|
||||
def record_title(type_id: str, fields: dict) -> str:
|
||||
for key in ("title", "username", "email", "wechat_id", "qq_number"):
|
||||
if fields.get(key):
|
||||
return str(fields[key])
|
||||
if type_id == "exchange":
|
||||
ex = fields.get("exchange", "")
|
||||
un = fields.get("username", "")
|
||||
return f"{ex} {un}".strip() or "未命名"
|
||||
return "未命名"
|
||||
|
||||
|
||||
def _field_applies(fd: dict, fields_in: dict) -> bool:
|
||||
when = fd.get("when")
|
||||
if not when:
|
||||
return True
|
||||
return all(str(fields_in.get(k, "")).strip() == str(v) for k, v in when.items())
|
||||
|
||||
|
||||
def validate_record_payload(payload: dict):
|
||||
type_id = (payload.get("type_id") or "").strip()
|
||||
type_def = get_type_def(type_id)
|
||||
if not type_def:
|
||||
return None, "无效的凭证类型"
|
||||
|
||||
fields_in = payload.get("fields") or {}
|
||||
if not isinstance(fields_in, dict):
|
||||
return None, "字段格式错误"
|
||||
|
||||
fields = {}
|
||||
for fd in type_def.get("fields", []):
|
||||
if not _field_applies(fd, fields_in):
|
||||
continue
|
||||
key = fd["key"]
|
||||
val = fields_in.get(key)
|
||||
if val is None:
|
||||
val = ""
|
||||
val = str(val).strip() if not isinstance(val, str) else val.strip()
|
||||
if fd.get("required") and not val:
|
||||
return None, f"{fd.get('label', key)} 不能为空"
|
||||
fields[key] = val
|
||||
|
||||
if type_id == "exchange":
|
||||
ex = fields.get("exchange", "").lower()
|
||||
if ex not in VALID_EXCHANGES:
|
||||
return None, "无效交易所"
|
||||
for req in ("username", "api_key", "api_secret"):
|
||||
if not fields.get(req):
|
||||
return None, f"{req} 不能为空"
|
||||
if ex == "okx" and not fields.get("password"):
|
||||
return None, "OKX 密码不能为空"
|
||||
|
||||
return {
|
||||
"type_id": type_id,
|
||||
"title": record_title(type_id, fields),
|
||||
"fields": fields,
|
||||
}, None
|
||||
|
||||
|
||||
def filter_records(records, type_id=None, q=None):
|
||||
out = records
|
||||
if type_id:
|
||||
out = [r for r in out if r.get("type_id") == type_id]
|
||||
if q:
|
||||
q = q.lower()
|
||||
filtered = []
|
||||
for r in out:
|
||||
blob = json.dumps(r, ensure_ascii=False).lower()
|
||||
if q in blob:
|
||||
filtered.append(r)
|
||||
out = filtered
|
||||
return out
|
||||
Reference in New Issue
Block a user