f1ad4273f4
- FastAPI 单进程托管 frontend/dist,systemd 替代 PM2 - 超级管理员 admin、注册开关与用户管理 - README/DEPLOY/USAGE 说明:改代码须本地构建 dist 后 push,服务器 update.sh - 提交 frontend/dist 与 build-frontend 脚本
81 lines
3.1 KiB
Python
81 lines
3.1 KiB
Python
import uuid
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from jose import JWTError, jwt
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.config import settings
|
|
from app.core.database import get_db
|
|
from app.core.deps import get_current_user
|
|
from app.core.security import (
|
|
create_access_token,
|
|
create_refresh_token,
|
|
get_password_hash,
|
|
verify_password,
|
|
)
|
|
from app.models.user import SystemSettings, User
|
|
from app.schemas import RefreshRequest, TokenResponse, UserLogin, UserOut, UserRegister
|
|
|
|
router = APIRouter(prefix="/auth", tags=["auth"])
|
|
|
|
|
|
def get_or_create_settings(db: Session) -> SystemSettings:
|
|
row = db.get(SystemSettings, 1)
|
|
if row is None:
|
|
row = SystemSettings(id=1, registration_enabled=True)
|
|
db.add(row)
|
|
db.commit()
|
|
db.refresh(row)
|
|
return row
|
|
|
|
|
|
@router.post("/register", response_model=UserOut, status_code=status.HTTP_201_CREATED)
|
|
def register(data: UserRegister, db: Session = Depends(get_db)):
|
|
settings_row = get_or_create_settings(db)
|
|
if not settings_row.registration_enabled:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="注册已关闭,请联系管理员")
|
|
if db.query(User).filter(User.username == data.username).first():
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="用户名已存在")
|
|
user = User(username=data.username, password_hash=get_password_hash(data.password))
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|
|
|
|
|
|
@router.post("/login", response_model=TokenResponse)
|
|
def login(data: UserLogin, db: Session = Depends(get_db)):
|
|
user = db.query(User).filter(User.username == data.username).first()
|
|
if user is None or not verify_password(data.password, user.password_hash):
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="用户名或密码错误")
|
|
return TokenResponse(
|
|
access_token=create_access_token(str(user.id)),
|
|
refresh_token=create_refresh_token(str(user.id)),
|
|
)
|
|
|
|
|
|
@router.post("/refresh", response_model=TokenResponse)
|
|
def refresh(data: RefreshRequest, db: Session = Depends(get_db)):
|
|
try:
|
|
payload = jwt.decode(data.refresh_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
|
|
user_id = payload.get("sub")
|
|
token_type = payload.get("type")
|
|
if user_id is None or token_type != "refresh":
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="无效刷新令牌")
|
|
except JWTError as exc:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="无效刷新令牌") from exc
|
|
|
|
user = db.get(User, uuid.UUID(user_id))
|
|
if user is None:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="用户不存在")
|
|
|
|
return TokenResponse(
|
|
access_token=create_access_token(str(user.id)),
|
|
refresh_token=create_refresh_token(str(user.id)),
|
|
)
|
|
|
|
|
|
@router.get("/me", response_model=UserOut)
|
|
def me(current_user: User = Depends(get_current_user)):
|
|
return current_user
|