Files
secondary-school-grade-archive/backend/app/routers/wrong_questions.py
T
dekun c30e21b51e 修复错题图片显示、Tab 刷新跳转,奥数仅数学并支持删除。
- 图片通过带 Token 的 blob 请求加载,修复不显示

- URL ?tab= 保持当前标签,刷新列表不再重置到成绩录入

- 奥数区固定数学科目;错题卡片与详情增加删除
2026-06-28 13:47:53 +08:00

337 lines
12 KiB
Python

import uuid
from pathlib import Path
from fastapi import APIRouter, BackgroundTasks, Depends, File, Form, HTTPException, Query, UploadFile, status
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session, joinedload
from app.core.config import settings
from app.core.database import SessionLocal, get_db
from app.core.deps import get_current_user
from app.models.user import Subject, User, WrongQuestion, WrongQuestionCategory, WrongQuestionStatus
from app.schemas import WrongQuestionCategoryEnum, WrongQuestionOut, WrongQuestionUpdate
from app.services import llm as llm_service
from app.services import ocr as ocr_service
from app.services.student_access import get_student_for_user
router = APIRouter(tags=["wrong_questions"])
def _wq_to_out(wq: WrongQuestion) -> WrongQuestionOut:
return WrongQuestionOut(
id=wq.id,
student_id=wq.student_id,
subject_id=wq.subject_id,
subject_name=wq.subject.name if wq.subject else None,
category=wq.category,
image_path=wq.image_path,
ocr_raw_text=wq.ocr_raw_text,
question_text=wq.question_text,
solution_text=wq.solution_text,
status=wq.status,
created_at=wq.created_at,
)
def _process_wrong_question(question_id: uuid.UUID):
db = SessionLocal()
try:
wq = (
db.query(WrongQuestion)
.options(joinedload(WrongQuestion.subject), joinedload(WrongQuestion.student))
.filter(WrongQuestion.id == question_id)
.first()
)
if wq is None:
return
image_full = Path(settings.UPLOAD_DIR) / wq.image_path
try:
ocr_text = ocr_service.run_ocr(str(image_full))
wq.ocr_raw_text = ocr_text or None
wq.status = WrongQuestionStatus.ocr_done if ocr_text else WrongQuestionStatus.failed
db.commit()
except Exception:
wq.status = WrongQuestionStatus.failed
db.commit()
return
if not ocr_text:
return
subject_name = wq.subject.name if wq.subject else "综合"
school_level = wq.student.school_level if wq.student else None
olympiad = wq.category == WrongQuestionCategory.olympiad
ai_cfg = llm_service.load_ai_config(db)
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
question_text = loop.run_until_complete(
llm_service.format_question(ai_cfg, subject_name, ocr_text, school_level)
)
solution_text = loop.run_until_complete(
llm_service.generate_solution(
ai_cfg,
subject_name,
question_text,
school_level,
olympiad=olympiad,
)
)
wq.question_text = question_text
wq.solution_text = solution_text
wq.status = WrongQuestionStatus.solved
db.commit()
except Exception:
wq.status = WrongQuestionStatus.ocr_done
db.commit()
finally:
loop.close()
finally:
db.close()
@router.get("/students/{student_id}/wrong-questions", response_model=list[WrongQuestionOut])
def list_wrong_questions(
student_id: uuid.UUID,
subject_id: int | None = Query(None),
category: WrongQuestionCategoryEnum | None = Query(None),
q: str | None = Query(None),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
get_student_for_user(db, student_id, current_user.id)
query = (
db.query(WrongQuestion)
.options(joinedload(WrongQuestion.subject))
.filter(WrongQuestion.student_id == student_id)
)
if subject_id is not None:
query = query.filter(WrongQuestion.subject_id == subject_id)
if category is not None:
query = query.filter(WrongQuestion.category == category.value)
if q:
pattern = f"%{q}%"
query = query.filter(
(WrongQuestion.question_text.ilike(pattern))
| (WrongQuestion.solution_text.ilike(pattern))
| (WrongQuestion.ocr_raw_text.ilike(pattern))
)
items = query.order_by(WrongQuestion.created_at.desc()).all()
return [_wq_to_out(w) for w in items]
@router.post(
"/students/{student_id}/wrong-questions",
response_model=WrongQuestionOut,
status_code=status.HTTP_201_CREATED,
)
async def upload_wrong_question(
student_id: uuid.UUID,
background_tasks: BackgroundTasks,
subject_id: int = Form(...),
file: UploadFile = File(...),
category: WrongQuestionCategoryEnum = Form(WrongQuestionCategoryEnum.regular),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
get_student_for_user(db, student_id, current_user.id)
subject = db.get(Subject, subject_id)
if subject is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="科目不存在")
if category == WrongQuestionCategoryEnum.olympiad and subject.name != "数学":
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="奥数区仅支持数学科目")
content = await file.read()
if len(content) > settings.MAX_UPLOAD_SIZE:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="文件超过10MB限制")
wq = WrongQuestion(
student_id=student_id,
subject_id=subject_id,
image_path="",
category=WrongQuestionCategory(category.value),
status=WrongQuestionStatus.pending,
)
db.add(wq)
db.flush()
rel_path = ocr_service.save_upload_file(
str(current_user.id), str(wq.id), file.filename or "image.jpg", content
)
wq.image_path = rel_path
db.commit()
db.refresh(wq)
wq = (
db.query(WrongQuestion)
.options(joinedload(WrongQuestion.subject))
.filter(WrongQuestion.id == wq.id)
.first()
)
background_tasks.add_task(_process_wrong_question, wq.id)
return _wq_to_out(wq)
@router.get("/wrong-questions/{question_id}", response_model=WrongQuestionOut)
def get_wrong_question(
question_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
wq = (
db.query(WrongQuestion)
.options(joinedload(WrongQuestion.subject), joinedload(WrongQuestion.student))
.filter(WrongQuestion.id == question_id)
.first()
)
if wq is None or wq.student.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="错题不存在")
return _wq_to_out(wq)
@router.patch("/wrong-questions/{question_id}", response_model=WrongQuestionOut)
def update_wrong_question(
question_id: uuid.UUID,
data: WrongQuestionUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
wq = (
db.query(WrongQuestion)
.options(joinedload(WrongQuestion.subject), joinedload(WrongQuestion.student))
.filter(WrongQuestion.id == question_id)
.first()
)
if wq is None or wq.student.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="错题不存在")
if data.subject_id is not None:
if db.get(Subject, data.subject_id) is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="科目不存在")
wq.subject_id = data.subject_id
if data.question_text is not None:
wq.question_text = data.question_text
if data.solution_text is not None:
wq.solution_text = data.solution_text
db.commit()
db.refresh(wq)
return _wq_to_out(wq)
@router.delete("/wrong-questions/{question_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_wrong_question(
question_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
wq = (
db.query(WrongQuestion)
.options(joinedload(WrongQuestion.student))
.filter(WrongQuestion.id == question_id)
.first()
)
if wq is None or wq.student.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="错题不存在")
image_full = Path(settings.UPLOAD_DIR) / wq.image_path
db.delete(wq)
db.commit()
if image_full.exists():
image_full.unlink()
@router.post("/wrong-questions/{question_id}/retry-ocr", response_model=WrongQuestionOut)
def retry_ocr(
question_id: uuid.UUID,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
wq = (
db.query(WrongQuestion)
.options(joinedload(WrongQuestion.subject), joinedload(WrongQuestion.student))
.filter(WrongQuestion.id == question_id)
.first()
)
if wq is None or wq.student.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="错题不存在")
wq.status = WrongQuestionStatus.pending
db.commit()
background_tasks.add_task(_process_wrong_question, wq.id)
return _wq_to_out(wq)
@router.post("/wrong-questions/{question_id}/regenerate-solution", response_model=WrongQuestionOut)
async def regenerate_solution(
question_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
wq = (
db.query(WrongQuestion)
.options(joinedload(WrongQuestion.subject), joinedload(WrongQuestion.student))
.filter(WrongQuestion.id == question_id)
.first()
)
if wq is None or wq.student.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="错题不存在")
if not wq.question_text and not wq.ocr_raw_text:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="缺少题目内容")
subject_name = wq.subject.name if wq.subject else "综合"
school_level = wq.student.school_level if wq.student else None
olympiad = wq.category == WrongQuestionCategory.olympiad
question_text = wq.question_text or wq.ocr_raw_text or ""
ai_cfg = llm_service.load_ai_config(db)
try:
if not wq.question_text and wq.ocr_raw_text:
wq.question_text = await llm_service.format_question(
ai_cfg, subject_name, wq.ocr_raw_text, school_level
)
question_text = wq.question_text
wq.solution_text = await llm_service.generate_solution(
ai_cfg,
subject_name,
question_text,
school_level,
olympiad=olympiad,
)
wq.status = WrongQuestionStatus.solved
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY, detail=f"AI 调用失败: {exc}"
) from exc
db.commit()
db.refresh(wq)
return _wq_to_out(wq)
@router.get("/wrong-questions/{question_id}/image")
def get_wrong_question_image(
question_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
wq = (
db.query(WrongQuestion)
.options(joinedload(WrongQuestion.student))
.filter(WrongQuestion.id == question_id)
.first()
)
if wq is None or wq.student.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="错题不存在")
image_full = Path(settings.UPLOAD_DIR) / wq.image_path
if not image_full.exists():
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="图片不存在")
return FileResponse(image_full)