e5a586f903
Move business code under modules/, env template to config/, PM2 single qihuo process, and _legacy shims for old imports. Co-authored-by: Cursor <cursoragent@cursor.com>
87 lines
3.0 KiB
Python
87 lines
3.0 KiB
Python
# Copyright (c) 2025-2026 马建军. All rights reserved.
|
||
# 专有软件 — 未经授权禁止复制、传播、转售。
|
||
# 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。
|
||
# 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md
|
||
|
||
"""Web 登录账号:settings 表 + .env 同步。"""
|
||
from __future__ import annotations
|
||
|
||
import os
|
||
import re
|
||
from typing import Callable
|
||
|
||
from werkzeug.security import check_password_hash, generate_password_hash
|
||
|
||
from modules.core.env_file import update_env_vars
|
||
|
||
ADMIN_USERNAME_KEY = "ADMIN_USERNAME"
|
||
ADMIN_PASSWORD_KEY = "ADMIN_PASSWORD"
|
||
|
||
|
||
def save_admin_credentials(
|
||
*,
|
||
username: str,
|
||
old_password: str,
|
||
new_password: str,
|
||
new_password2: str,
|
||
get_setting: Callable[[str, str], str],
|
||
set_setting: Callable[[str, str], None],
|
||
) -> tuple[bool, str, dict[str, str]]:
|
||
"""
|
||
校验原密码后更新用户名/密码,写入 settings 与 .env。
|
||
返回 (成功, 提示, env_updates)。
|
||
"""
|
||
username = (username or "").strip()
|
||
old_password = old_password or ""
|
||
new_password = new_password or ""
|
||
new_password2 = new_password2 or ""
|
||
|
||
if not username:
|
||
return False, "用户名不能为空", {}
|
||
if len(username) > 64:
|
||
return False, "用户名过长(最多 64 字符)", {}
|
||
if not re.match(r"^[A-Za-z0-9_.@-]+$", username):
|
||
return False, "用户名仅支持字母、数字及 _ . @ -", {}
|
||
|
||
admin_hash = get_setting("admin_password_hash")
|
||
if not admin_hash or not check_password_hash(admin_hash, old_password):
|
||
return False, "原密码错误", {}
|
||
|
||
current_username = (get_setting("admin_username") or "").strip()
|
||
password_change = bool(new_password or new_password2)
|
||
|
||
if password_change:
|
||
if not new_password or not new_password2:
|
||
return False, "请同时填写新密码与确认密码", {}
|
||
if len(new_password) < 6:
|
||
return False, "新密码至少 6 位", {}
|
||
if new_password != new_password2:
|
||
return False, "两次新密码不一致", {}
|
||
|
||
username_changed = username != current_username
|
||
if not username_changed and not password_change:
|
||
return False, "未修改任何内容", {}
|
||
|
||
set_setting("admin_username", username)
|
||
env_updates: dict[str, str] = {ADMIN_USERNAME_KEY: username}
|
||
|
||
if password_change:
|
||
set_setting("admin_password_hash", generate_password_hash(new_password))
|
||
env_updates[ADMIN_PASSWORD_KEY] = new_password
|
||
|
||
try:
|
||
update_env_vars(env_updates)
|
||
except OSError as exc:
|
||
return False, f"数据库已更新,但写入 .env 失败:{exc}", env_updates
|
||
|
||
for key, val in env_updates.items():
|
||
os.environ[key] = val
|
||
|
||
parts: list[str] = []
|
||
if username_changed:
|
||
parts.append("用户名已更新")
|
||
if password_change:
|
||
parts.append("密码已更新")
|
||
parts.append("已同步至 .env")
|
||
return True, ";".join(parts), env_updates
|