""" 凭证保险库 — Flask 后端 登录凭据与加密主密钥在 .env | 数据加密存 data.json """ import re import uuid from functools import wraps from flask import Flask, jsonify, request, send_from_directory, session import env_config from storage import ( BUILTIN_TYPES, all_type_defs, filter_records, get_type_def, load_records, load_settings, save_records, save_settings, validate_record_payload, ) env_config.ensure_env() app = Flask(__name__) app.secret_key = env_config.get_secret_key() app.config.update( SESSION_COOKIE_HTTPONLY=True, SESSION_COOKIE_SAMESITE="Lax", PERMANENT_SESSION_LIFETIME=1800, ) BASE_DIR = env_config.BASE_DIR PUBLIC_API = {"/api/auth/login", "/api/auth/status"} def login_required(fn): @wraps(fn) def wrapper(*args, **kwargs): if not session.get("authenticated"): return jsonify({"error": "未登录"}), 401 return fn(*args, **kwargs) return wrapper @app.before_request def guard_api(): if not request.path.startswith("/api/"): return None if request.path in PUBLIC_API: return None if not session.get("authenticated"): return jsonify({"error": "未登录"}), 401 return None @app.route("/") def index(): return send_from_directory(BASE_DIR, "index.html") @app.route("/api/auth/status", methods=["GET"]) def auth_status(): return jsonify({"logged_in": bool(session.get("authenticated"))}) @app.route("/api/auth/login", methods=["POST"]) def auth_login(): body = request.get_json(silent=True) or {} username = (body.get("username") or "").strip() password = body.get("password") or "" if username != env_config.get_auth_username() or not env_config.verify_password(password): return jsonify({"error": "用户名或密码错误"}), 401 session["authenticated"] = True session.permanent = True return jsonify({"ok": True}) @app.route("/api/auth/logout", methods=["POST"]) @login_required def auth_logout(): session.clear() return jsonify({"ok": True}) @app.route("/api/settings", methods=["GET"]) @login_required def get_settings(): s = load_settings() return jsonify( { "builtin_types": BUILTIN_TYPES, "custom_types": s.get("custom_types", []), "username": env_config.get_auth_username(), } ) @app.route("/api/settings/auth", methods=["PUT"]) @login_required def update_settings_auth(): body = request.get_json(silent=True) or {} current = body.get("current_password") or "" if not env_config.verify_password(current): return jsonify({"error": "当前密码错误"}), 400 username = (body.get("username") or "").strip() new_pass = body.get("new_password") or "" if not username: return jsonify({"error": "用户名不能为空"}), 400 if len(new_pass) < 6: return jsonify({"error": "新密码至少 6 位"}), 400 env_config.update_auth(username, new_pass) return jsonify({"ok": True, "username": username}) @app.route("/api/settings/types", methods=["POST"]) @login_required def add_custom_type(): body = request.get_json(silent=True) or {} type_id = (body.get("id") or "").strip().lower() label = (body.get("label") or "").strip() fields = body.get("fields") or [] if not re.match(r"^[a-z][a-z0-9_]{1,31}$", type_id): return jsonify({"error": "类型 ID 需为小写字母开头,仅含 a-z0-9_"}), 400 if not label: return jsonify({"error": "显示名称不能为空"}), 400 if get_type_def(type_id): return jsonify({"error": "类型 ID 已存在"}), 400 if not fields: return jsonify({"error": "至少添加一个字段"}), 400 norm_fields = [] for f in fields: key = (f.get("key") or "").strip().lower() if not re.match(r"^[a-z][a-z0-9_]{0,31}$", key): return jsonify({"error": f"无效字段 key: {key}"}), 400 norm_fields.append( { "key": key, "label": (f.get("label") or key).strip(), "type": f.get("type") or "text", "required": bool(f.get("required")), } ) s = load_settings() entry = {"id": type_id, "label": label, "builtin": False, "fields": norm_fields} s["custom_types"].append(entry) save_settings(s) return jsonify(entry), 201 @app.route("/api/settings/types/", methods=["DELETE"]) @login_required def delete_custom_type(type_id): s = load_settings() before = len(s["custom_types"]) s["custom_types"] = [t for t in s["custom_types"] if t["id"] != type_id] if len(s["custom_types"]) == before: return jsonify({"error": "自定义类型不存在"}), 404 save_settings(s) records = [r for r in load_records() if r.get("type_id") != type_id] save_records(records) return jsonify({"ok": True}) @app.route("/api/credentials", methods=["GET"]) @login_required def list_credentials(): records = load_records() type_id = (request.args.get("type_id") or "").strip() q = (request.args.get("q") or "").strip() if type_id == "exchange" and request.args.get("exchange"): ex = request.args.get("exchange").strip().lower() records = [ r for r in records if r.get("type_id") == "exchange" and r.get("fields", {}).get("exchange") == ex ] else: records = filter_records(records, type_id=type_id or None, q=q or None) return jsonify(records) @app.route("/api/credentials", methods=["POST"]) @login_required def create_credential(): body = request.get_json(silent=True) or {} record, err = validate_record_payload(body) if err: return jsonify({"error": err}), 400 record["id"] = str(uuid.uuid4()) records = load_records() records.append(record) save_records(records) return jsonify(record), 201 @app.route("/api/credentials/", methods=["DELETE"]) @login_required def delete_credential(record_id): records = load_records() new_records = [r for r in records if r.get("id") != record_id] if len(new_records) == len(records): return jsonify({"error": "记录不存在"}), 404 save_records(new_records) return jsonify({"ok": True}) # 兼容旧前端路径 @app.route("/api/accounts", methods=["GET", "POST"]) @login_required def accounts_compat(): if request.method == "GET": return list_credentials() return create_credential() @app.route("/api/accounts/", methods=["DELETE"]) @login_required def accounts_delete_compat(account_id): return delete_credential(account_id) if __name__ == "__main__": app.run(host="0.0.0.0", port=5200, debug=False)