import json import uuid from fastapi import APIRouter, Depends, HTTPException, Query, status from sqlalchemy.orm import Session, joinedload from app.core.database import get_db from app.core.deps import get_current_user from app.models.user import ExamRecord, SubjectScore, User from app.schemas import ( ExamCreate, ExamOut, ExamReviewUpdate, ExamUpdate, ReviewStatusEnum, ScoreOut, TrendResponse, ) from app.services.score_trend import build_trend from app.services.student_access import get_student_for_user router = APIRouter(tags=["exams"]) def _parse_review_statuses(raw: str | None) -> list[ReviewStatusEnum]: if not raw: return [] try: data = json.loads(raw) except json.JSONDecodeError: return [] if not isinstance(data, list): return [] result: list[ReviewStatusEnum] = [] for item in data: try: result.append(ReviewStatusEnum(str(item))) except ValueError: continue return result def _serialize_review_statuses(statuses: list[ReviewStatusEnum] | None) -> str | None: if not statuses: return None values: list[str] = [] for item in statuses: if isinstance(item, ReviewStatusEnum): values.append(item.value) else: values.append(str(item)) return json.dumps(values, ensure_ascii=False) def _score_to_out(score: SubjectScore) -> ScoreOut: return ScoreOut( id=score.id, subject_id=score.subject_id, subject_name=score.subject.name if score.subject else None, total_score=float(score.total_score), obtained_score=float(score.obtained_score), ratio=float(score.ratio), review_statuses=_parse_review_statuses(score.review_statuses_json), ) def _exam_to_out(exam: ExamRecord) -> ExamOut: return ExamOut( id=exam.id, exam_type=exam.exam_type, exam_date=exam.exam_date, title=exam.title, created_at=exam.created_at, scores=[_score_to_out(s) for s in exam.scores], ) def _apply_scores(db: Session, exam: ExamRecord, scores_data): existing_by_subject = {s.subject_id: s for s in list(exam.scores)} keep_subject_ids: set[int] = set() for item in scores_data: keep_subject_ids.add(item.subject_id) ratio = round(item.obtained_score / item.total_score, 4) review_json = _serialize_review_statuses(item.review_statuses) existing = existing_by_subject.get(item.subject_id) if existing is not None: existing.total_score = item.total_score existing.obtained_score = item.obtained_score existing.ratio = ratio existing.review_statuses_json = review_json else: exam.scores.append( SubjectScore( subject_id=item.subject_id, total_score=item.total_score, obtained_score=item.obtained_score, ratio=ratio, review_statuses_json=review_json, ) ) for subject_id, score in existing_by_subject.items(): if subject_id not in keep_subject_ids: exam.scores.remove(score) db.delete(score) @router.get("/students/{student_id}/exams", response_model=list[ExamOut]) def list_exams( student_id: uuid.UUID, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): get_student_for_user(db, student_id, current_user.id) exams = ( db.query(ExamRecord) .options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject)) .filter(ExamRecord.student_id == student_id) .order_by(ExamRecord.exam_date.desc(), ExamRecord.created_at.desc()) .all() ) return [_exam_to_out(e) for e in exams] @router.post("/students/{student_id}/exams", response_model=ExamOut, status_code=status.HTTP_201_CREATED) def create_exam( student_id: uuid.UUID, data: ExamCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): get_student_for_user(db, student_id, current_user.id) exam = ExamRecord( student_id=student_id, exam_type=data.exam_type, exam_date=data.exam_date, title=data.title, ) db.add(exam) db.flush() if data.scores: _apply_scores(db, exam, data.scores) db.commit() db.refresh(exam) exam = ( db.query(ExamRecord) .options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject)) .filter(ExamRecord.id == exam.id) .first() ) return _exam_to_out(exam) @router.get("/exams/{exam_id}", response_model=ExamOut) def get_exam( exam_id: uuid.UUID, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): exam = ( db.query(ExamRecord) .options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject)) .join(ExamRecord.student) .filter(ExamRecord.id == exam_id) .first() ) if exam is None or exam.student.user_id != current_user.id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="考试记录不存在") return _exam_to_out(exam) @router.patch("/exams/{exam_id}", response_model=ExamOut) def update_exam( exam_id: uuid.UUID, data: ExamUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): exam = ( db.query(ExamRecord) .options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject)) .join(ExamRecord.student) .filter(ExamRecord.id == exam_id) .first() ) if exam is None or exam.student.user_id != current_user.id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="考试记录不存在") if data.exam_type is not None: exam.exam_type = data.exam_type if data.exam_date is not None: exam.exam_date = data.exam_date if data.title is not None: exam.title = data.title if data.scores is not None: _apply_scores(db, exam, data.scores) db.commit() db.refresh(exam) exam = ( db.query(ExamRecord) .options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject)) .filter(ExamRecord.id == exam.id) .first() ) return _exam_to_out(exam) @router.patch("/exams/{exam_id}/review", response_model=ExamOut) def update_exam_review( exam_id: uuid.UUID, data: ExamReviewUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): exam = ( db.query(ExamRecord) .options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject)) .join(ExamRecord.student) .filter(ExamRecord.id == exam_id) .first() ) if exam is None or exam.student.user_id != current_user.id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="考试记录不存在") by_subject = {s.subject_id: s for s in exam.scores} for item in data.reviews: score = by_subject.get(item.subject_id) if score is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"科目 {item.subject_id} 不在该次考试中", ) score.review_statuses_json = _serialize_review_statuses(item.review_statuses) db.commit() db.refresh(exam) exam = ( db.query(ExamRecord) .options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject)) .filter(ExamRecord.id == exam.id) .first() ) return _exam_to_out(exam) @router.delete("/exams/{exam_id}", status_code=status.HTTP_204_NO_CONTENT) def delete_exam( exam_id: uuid.UUID, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): exam = ( db.query(ExamRecord) .join(ExamRecord.student) .filter(ExamRecord.id == exam_id) .first() ) if exam is None or exam.student.user_id != current_user.id: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="考试记录不存在") db.delete(exam) db.commit() @router.get("/students/{student_id}/scores/trend", response_model=TrendResponse) def get_score_trend( student_id: uuid.UUID, subject_id: int = Query(...), db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): get_student_for_user(db, student_id, current_user.id) try: return build_trend(db, student_id, subject_id) except ValueError as exc: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc