Files
secondary-school-grade-archive/backend/app/routers/exams.py
T

354 lines
12 KiB
Python

import json
import uuid
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session, joinedload
from app.core.database import get_db
from app.core.deps import get_current_user
from app.models.user import ExamRecord, SubjectScore, User
from app.schemas import (
ExamCreate,
ExamOut,
ExamReviewUpdate,
ExamUpdate,
ReviewInsightRequest,
ReviewInsightResponse,
ReviewStatusEnum,
ScoreOut,
TrendResponse,
)
from app.services import llm as llm_service
from app.services.score_trend import build_trend
from app.services.student_access import get_student_for_user
router = APIRouter(tags=["exams"])
_EXAM_TYPE_LABELS = {"weekly": "周考", "monthly": "月考", "final": "期末"}
_REVIEW_STATUS_LABELS = {
ReviewStatusEnum.careless: "粗心",
ReviewStatusEnum.unknown: "不会",
ReviewStatusEnum.nervous: "紧张",
ReviewStatusEnum.normal: "正常发挥",
}
def _subject_display_name(score: SubjectScore) -> str:
return score.subject.name if score.subject else f"科目{score.subject_id}"
def _build_review_records_text(exams: list[ExamRecord], subject_name: str) -> str:
lines: list[str] = []
for exam in sorted(exams, key=lambda item: item.exam_date, reverse=True):
for score in exam.scores:
if _subject_display_name(score) != subject_name:
continue
statuses = _parse_review_statuses(score.review_statuses_json)
if not statuses:
continue
type_label = _EXAM_TYPE_LABELS.get(exam.exam_type.value, exam.exam_type.value)
status_text = "".join(_REVIEW_STATUS_LABELS.get(s, s.value) for s in statuses)
ratio = float(score.ratio) * 100
line = (
f"- {exam.exam_date} {type_label} "
f"得分{float(score.obtained_score):g}/{float(score.total_score):g}({ratio:.1f}%) "
f"状态:{status_text}"
)
if exam.title:
line += f" 备注:{exam.title}"
lines.append(line)
return "\n".join(lines)
def _parse_review_statuses(raw: str | None) -> list[ReviewStatusEnum]:
if not raw:
return []
try:
data = json.loads(raw)
except json.JSONDecodeError:
return []
if not isinstance(data, list):
return []
result: list[ReviewStatusEnum] = []
for item in data:
try:
result.append(ReviewStatusEnum(str(item)))
except ValueError:
continue
return result
def _serialize_review_statuses(statuses: list[ReviewStatusEnum] | None) -> str | None:
if not statuses:
return None
values: list[str] = []
for item in statuses:
if isinstance(item, ReviewStatusEnum):
values.append(item.value)
else:
values.append(str(item))
return json.dumps(values, ensure_ascii=False)
def _score_to_out(score: SubjectScore) -> ScoreOut:
return ScoreOut(
id=score.id,
subject_id=score.subject_id,
subject_name=score.subject.name if score.subject else None,
total_score=float(score.total_score),
obtained_score=float(score.obtained_score),
ratio=float(score.ratio),
review_statuses=_parse_review_statuses(score.review_statuses_json),
)
def _exam_to_out(exam: ExamRecord) -> ExamOut:
return ExamOut(
id=exam.id,
exam_type=exam.exam_type,
exam_date=exam.exam_date,
title=exam.title,
created_at=exam.created_at,
scores=[_score_to_out(s) for s in exam.scores],
)
def _apply_scores(db: Session, exam: ExamRecord, scores_data):
existing_by_subject = {s.subject_id: s for s in list(exam.scores)}
keep_subject_ids: set[int] = set()
for item in scores_data:
keep_subject_ids.add(item.subject_id)
ratio = round(item.obtained_score / item.total_score, 4)
review_json = _serialize_review_statuses(item.review_statuses)
existing = existing_by_subject.get(item.subject_id)
if existing is not None:
existing.total_score = item.total_score
existing.obtained_score = item.obtained_score
existing.ratio = ratio
existing.review_statuses_json = review_json
else:
exam.scores.append(
SubjectScore(
subject_id=item.subject_id,
total_score=item.total_score,
obtained_score=item.obtained_score,
ratio=ratio,
review_statuses_json=review_json,
)
)
for subject_id, score in existing_by_subject.items():
if subject_id not in keep_subject_ids:
exam.scores.remove(score)
db.delete(score)
@router.get("/students/{student_id}/exams", response_model=list[ExamOut])
def list_exams(
student_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
get_student_for_user(db, student_id, current_user.id)
exams = (
db.query(ExamRecord)
.options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject))
.filter(ExamRecord.student_id == student_id)
.order_by(ExamRecord.exam_date.desc(), ExamRecord.created_at.desc())
.all()
)
return [_exam_to_out(e) for e in exams]
@router.post("/students/{student_id}/exams", response_model=ExamOut, status_code=status.HTTP_201_CREATED)
def create_exam(
student_id: uuid.UUID,
data: ExamCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
get_student_for_user(db, student_id, current_user.id)
exam = ExamRecord(
student_id=student_id,
exam_type=data.exam_type,
exam_date=data.exam_date,
title=data.title,
)
db.add(exam)
db.flush()
if data.scores:
_apply_scores(db, exam, data.scores)
db.commit()
db.refresh(exam)
exam = (
db.query(ExamRecord)
.options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject))
.filter(ExamRecord.id == exam.id)
.first()
)
return _exam_to_out(exam)
@router.get("/exams/{exam_id}", response_model=ExamOut)
def get_exam(
exam_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
exam = (
db.query(ExamRecord)
.options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject))
.join(ExamRecord.student)
.filter(ExamRecord.id == exam_id)
.first()
)
if exam is None or exam.student.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="考试记录不存在")
return _exam_to_out(exam)
@router.patch("/exams/{exam_id}", response_model=ExamOut)
def update_exam(
exam_id: uuid.UUID,
data: ExamUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
exam = (
db.query(ExamRecord)
.options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject))
.join(ExamRecord.student)
.filter(ExamRecord.id == exam_id)
.first()
)
if exam is None or exam.student.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="考试记录不存在")
if data.exam_type is not None:
exam.exam_type = data.exam_type
if data.exam_date is not None:
exam.exam_date = data.exam_date
if data.title is not None:
exam.title = data.title
if data.scores is not None:
_apply_scores(db, exam, data.scores)
db.commit()
db.refresh(exam)
exam = (
db.query(ExamRecord)
.options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject))
.filter(ExamRecord.id == exam.id)
.first()
)
return _exam_to_out(exam)
@router.patch("/exams/{exam_id}/review", response_model=ExamOut)
def update_exam_review(
exam_id: uuid.UUID,
data: ExamReviewUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
exam = (
db.query(ExamRecord)
.options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject))
.join(ExamRecord.student)
.filter(ExamRecord.id == exam_id)
.first()
)
if exam is None or exam.student.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="考试记录不存在")
by_subject = {s.subject_id: s for s in exam.scores}
for item in data.reviews:
score = by_subject.get(item.subject_id)
if score is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"科目 {item.subject_id} 不在该次考试中",
)
score.review_statuses_json = _serialize_review_statuses(item.review_statuses)
db.commit()
db.refresh(exam)
exam = (
db.query(ExamRecord)
.options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject))
.filter(ExamRecord.id == exam.id)
.first()
)
return _exam_to_out(exam)
@router.post("/students/{student_id}/review-insight", response_model=ReviewInsightResponse)
async def review_insight(
student_id: uuid.UUID,
data: ReviewInsightRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
student = get_student_for_user(db, student_id, current_user.id)
exams = (
db.query(ExamRecord)
.options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject))
.filter(ExamRecord.student_id == student_id)
.all()
)
subject_name = data.subject_name.strip()
records = _build_review_records_text(exams, subject_name)
if not records:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="该科目暂无复盘数据",
)
ai_cfg = llm_service.load_ai_config(db)
try:
insight = await llm_service.generate_review_insight(
ai_cfg,
subject_name,
records,
student.school_level,
)
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"AI 调用失败: {exc}",
) from exc
return ReviewInsightResponse(insight=insight)
@router.delete("/exams/{exam_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_exam(
exam_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
exam = (
db.query(ExamRecord)
.join(ExamRecord.student)
.filter(ExamRecord.id == exam_id)
.first()
)
if exam is None or exam.student.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="考试记录不存在")
db.delete(exam)
db.commit()
@router.get("/students/{student_id}/scores/trend", response_model=TrendResponse)
def get_score_trend(
student_id: uuid.UUID,
subject_id: int = Query(...),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
get_student_for_user(db, student_id, current_user.id)
try:
return build_trend(db, student_id, subject_id)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc