Initial commit: secondary school grade archive system.

Add FastAPI/React app with Docker deployment, Ubuntu one-click install, and docs for junior/senior high score tracking and mistake bank.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-28 11:18:58 +08:00
commit e329d3398a
76 changed files with 8506 additions and 0 deletions
+7
View File
@@ -0,0 +1,7 @@
DATABASE_URL=postgresql://postgres:postgres@localhost:5432/student_archive
SECRET_KEY=dev-secret-key-change-in-production
CORS_ORIGINS=http://localhost:5173,http://localhost:3000
UPLOAD_DIR=uploads
OLLAMA_BASE_URL=http://127.0.0.1:11434
OLLAMA_MODEL=qwen2.5:7b
FLUCTUATION_THRESHOLD=0.08
+19
View File
@@ -0,0 +1,19 @@
FROM python:3.11-slim
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
libgomp1 libglib2.0-0 libsm6 libxext6 libxrender-dev \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app ./app
ENV UPLOAD_DIR=/app/uploads
RUN mkdir -p /app/uploads
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
View File
View File
+21
View File
@@ -0,0 +1,21 @@
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
DATABASE_URL: str = "postgresql://postgres:postgres@localhost:5432/student_archive"
SECRET_KEY: str = "change-me-in-production-use-a-long-random-string"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
ALGORITHM: str = "HS256"
UPLOAD_DIR: str = "uploads"
MAX_UPLOAD_SIZE: int = 10 * 1024 * 1024
OLLAMA_BASE_URL: str = "http://host.docker.internal:11434"
OLLAMA_MODEL: str = "qwen2.5:7b"
FLUCTUATION_THRESHOLD: float = 0.08
CORS_ORIGINS: str = "http://localhost:5173,http://localhost:3000,http://localhost"
class Config:
env_file = ".env"
settings = Settings()
+19
View File
@@ -0,0 +1,19 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import DeclarativeBase, sessionmaker
from app.core.config import settings
engine = create_engine(settings.DATABASE_URL, pool_pre_ping=True)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
class Base(DeclarativeBase):
pass
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
+34
View File
@@ -0,0 +1,34 @@
import uuid
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from app.core.config import settings
from app.core.database import get_db
from app.models.user import User
security = HTTPBearer(auto_error=False)
def get_current_user(
credentials: HTTPAuthorizationCredentials | None = Depends(security),
db: Session = Depends(get_db),
) -> User:
if credentials is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="未登录")
token = credentials.credentials
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
user_id: str | None = payload.get("sub")
token_type: str | None = payload.get("type")
if user_id is None or token_type != "access":
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="无效令牌")
except JWTError as exc:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="无效令牌") from exc
user = db.get(User, uuid.UUID(user_id))
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="用户不存在")
return user
+31
View File
@@ -0,0 +1,31 @@
from datetime import datetime, timedelta, timezone
from typing import Any
from jose import jwt
from passlib.context import CryptContext
from app.core.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(subject: str, expires_delta: timedelta | None = None) -> str:
expire = datetime.now(timezone.utc) + (
expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
)
payload: dict[str, Any] = {"sub": subject, "exp": expire, "type": "access"}
return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
def create_refresh_token(subject: str) -> str:
expire = datetime.now(timezone.utc) + timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
payload: dict[str, Any] = {"sub": subject, "exp": expire, "type": "refresh"}
return jwt.encode(payload, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
+48
View File
@@ -0,0 +1,48 @@
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.core.config import settings
from app.core.database import Base, SessionLocal, engine
from app.routers import auth, exams, export, students, subjects, wrong_questions
from app.services.migrate import run_migrations
from app.services.seed import seed_subjects
@asynccontextmanager
async def lifespan(app: FastAPI):
Path(settings.UPLOAD_DIR).mkdir(parents=True, exist_ok=True)
Base.metadata.create_all(bind=engine)
run_migrations()
db = SessionLocal()
try:
seed_subjects(db)
finally:
db.close()
yield
app = FastAPI(title="中学成绩档案", version="1.0.0", lifespan=lifespan)
origins = [o.strip() for o in settings.CORS_ORIGINS.split(",") if o.strip()]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(auth.router, prefix="/api")
app.include_router(students.router, prefix="/api")
app.include_router(subjects.router, prefix="/api")
app.include_router(exams.router, prefix="/api")
app.include_router(wrong_questions.router, prefix="/api")
app.include_router(export.router, prefix="/api")
@app.get("/api/health")
def health():
return {"status": "ok"}
View File
+130
View File
@@ -0,0 +1,130 @@
import enum
import uuid
from datetime import date, datetime, timezone
from sqlalchemy import Date, DateTime, Enum, ForeignKey, Integer, Numeric, String, Text, UniqueConstraint
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.core.database import Base
class ExamType(str, enum.Enum):
weekly = "weekly"
monthly = "monthly"
final = "final"
class WrongQuestionStatus(str, enum.Enum):
pending = "pending"
ocr_done = "ocr_done"
solved = "solved"
failed = "failed"
class SchoolLevel(str, enum.Enum):
junior_high = "junior_high"
senior_high = "senior_high"
class User(Base):
__tablename__ = "users"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
username: Mapped[str] = mapped_column(String(64), unique=True, index=True)
password_hash: Mapped[str] = mapped_column(String(255))
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
)
students: Mapped[list["Student"]] = relationship(back_populates="user", cascade="all, delete-orphan")
class Student(Base):
__tablename__ = "students"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("users.id"), index=True)
name: Mapped[str] = mapped_column(String(64))
school_level: Mapped[SchoolLevel] = mapped_column(
Enum(SchoolLevel), default=SchoolLevel.junior_high
)
grade: Mapped[str | None] = mapped_column(String(32), nullable=True)
class_name: Mapped[str | None] = mapped_column(String(32), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
)
user: Mapped["User"] = relationship(back_populates="students")
exam_records: Mapped[list["ExamRecord"]] = relationship(
back_populates="student", cascade="all, delete-orphan"
)
wrong_questions: Mapped[list["WrongQuestion"]] = relationship(
back_populates="student", cascade="all, delete-orphan"
)
class Subject(Base):
__tablename__ = "subjects"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(32), unique=True)
scores: Mapped[list["SubjectScore"]] = relationship(back_populates="subject")
wrong_questions: Mapped[list["WrongQuestion"]] = relationship(back_populates="subject")
class ExamRecord(Base):
__tablename__ = "exam_records"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
student_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("students.id"), index=True)
exam_type: Mapped[ExamType] = mapped_column(Enum(ExamType))
exam_date: Mapped[date] = mapped_column(Date)
title: Mapped[str | None] = mapped_column(String(128), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
)
student: Mapped["Student"] = relationship(back_populates="exam_records")
scores: Mapped[list["SubjectScore"]] = relationship(
back_populates="exam_record", cascade="all, delete-orphan"
)
class SubjectScore(Base):
__tablename__ = "subject_scores"
__table_args__ = (UniqueConstraint("exam_record_id", "subject_id", name="uq_exam_subject"),)
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
exam_record_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("exam_records.id"), index=True
)
subject_id: Mapped[int] = mapped_column(Integer, ForeignKey("subjects.id"))
total_score: Mapped[float] = mapped_column(Numeric(8, 2))
obtained_score: Mapped[float] = mapped_column(Numeric(8, 2))
ratio: Mapped[float] = mapped_column(Numeric(8, 4))
exam_record: Mapped["ExamRecord"] = relationship(back_populates="scores")
subject: Mapped["Subject"] = relationship(back_populates="scores")
class WrongQuestion(Base):
__tablename__ = "wrong_questions"
id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
student_id: Mapped[uuid.UUID] = mapped_column(UUID(as_uuid=True), ForeignKey("students.id"), index=True)
subject_id: Mapped[int] = mapped_column(Integer, ForeignKey("subjects.id"))
image_path: Mapped[str] = mapped_column(String(512))
ocr_raw_text: Mapped[str | None] = mapped_column(Text, nullable=True)
question_text: Mapped[str | None] = mapped_column(Text, nullable=True)
solution_text: Mapped[str | None] = mapped_column(Text, nullable=True)
status: Mapped[WrongQuestionStatus] = mapped_column(
Enum(WrongQuestionStatus), default=WrongQuestionStatus.pending
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=lambda: datetime.now(timezone.utc)
)
student: Mapped["Student"] = relationship(back_populates="wrong_questions")
subject: Mapped["Subject"] = relationship(back_populates="wrong_questions")
View File
+67
View File
@@ -0,0 +1,67 @@
import uuid
from fastapi import APIRouter, Depends, HTTPException, status
from jose import JWTError, jwt
from sqlalchemy.orm import Session
from app.core.config import settings
from app.core.database import get_db
from app.core.deps import get_current_user
from app.core.security import (
create_access_token,
create_refresh_token,
get_password_hash,
verify_password,
)
from app.models.user import User
from app.schemas import RefreshRequest, TokenResponse, UserLogin, UserOut, UserRegister
router = APIRouter(prefix="/auth", tags=["auth"])
@router.post("/register", response_model=UserOut, status_code=status.HTTP_201_CREATED)
def register(data: UserRegister, db: Session = Depends(get_db)):
if db.query(User).filter(User.username == data.username).first():
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="用户名已存在")
user = User(username=data.username, password_hash=get_password_hash(data.password))
db.add(user)
db.commit()
db.refresh(user)
return user
@router.post("/login", response_model=TokenResponse)
def login(data: UserLogin, db: Session = Depends(get_db)):
user = db.query(User).filter(User.username == data.username).first()
if user is None or not verify_password(data.password, user.password_hash):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="用户名或密码错误")
return TokenResponse(
access_token=create_access_token(str(user.id)),
refresh_token=create_refresh_token(str(user.id)),
)
@router.post("/refresh", response_model=TokenResponse)
def refresh(data: RefreshRequest, db: Session = Depends(get_db)):
try:
payload = jwt.decode(data.refresh_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
user_id = payload.get("sub")
token_type = payload.get("type")
if user_id is None or token_type != "refresh":
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="无效刷新令牌")
except JWTError as exc:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="无效刷新令牌") from exc
user = db.get(User, uuid.UUID(user_id))
if user is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="用户不存在")
return TokenResponse(
access_token=create_access_token(str(user.id)),
refresh_token=create_refresh_token(str(user.id)),
)
@router.get("/me", response_model=UserOut)
def me(current_user: User = Depends(get_current_user)):
return current_user
+182
View File
@@ -0,0 +1,182 @@
import uuid
from fastapi import APIRouter, Depends, HTTPException, Query, status
from sqlalchemy.orm import Session, joinedload
from app.core.database import get_db
from app.core.deps import get_current_user
from app.models.user import ExamRecord, SubjectScore, User
from app.schemas import ExamCreate, ExamOut, ExamUpdate, ScoreOut, TrendResponse
from app.services.score_trend import build_trend
from app.services.student_access import get_student_for_user
router = APIRouter(tags=["exams"])
def _score_to_out(score: SubjectScore) -> ScoreOut:
return ScoreOut(
id=score.id,
subject_id=score.subject_id,
subject_name=score.subject.name if score.subject else None,
total_score=float(score.total_score),
obtained_score=float(score.obtained_score),
ratio=float(score.ratio),
)
def _exam_to_out(exam: ExamRecord) -> ExamOut:
return ExamOut(
id=exam.id,
exam_type=exam.exam_type,
exam_date=exam.exam_date,
title=exam.title,
created_at=exam.created_at,
scores=[_score_to_out(s) for s in exam.scores],
)
def _apply_scores(db: Session, exam: ExamRecord, scores_data):
exam.scores.clear()
for item in scores_data:
ratio = round(item.obtained_score / item.total_score, 4)
exam.scores.append(
SubjectScore(
subject_id=item.subject_id,
total_score=item.total_score,
obtained_score=item.obtained_score,
ratio=ratio,
)
)
@router.get("/students/{student_id}/exams", response_model=list[ExamOut])
def list_exams(
student_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
get_student_for_user(db, student_id, current_user.id)
exams = (
db.query(ExamRecord)
.options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject))
.filter(ExamRecord.student_id == student_id)
.order_by(ExamRecord.exam_date.desc(), ExamRecord.created_at.desc())
.all()
)
return [_exam_to_out(e) for e in exams]
@router.post("/students/{student_id}/exams", response_model=ExamOut, status_code=status.HTTP_201_CREATED)
def create_exam(
student_id: uuid.UUID,
data: ExamCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
get_student_for_user(db, student_id, current_user.id)
exam = ExamRecord(
student_id=student_id,
exam_type=data.exam_type,
exam_date=data.exam_date,
title=data.title,
)
db.add(exam)
db.flush()
if data.scores:
_apply_scores(db, exam, data.scores)
db.commit()
db.refresh(exam)
exam = (
db.query(ExamRecord)
.options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject))
.filter(ExamRecord.id == exam.id)
.first()
)
return _exam_to_out(exam)
@router.get("/exams/{exam_id}", response_model=ExamOut)
def get_exam(
exam_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
exam = (
db.query(ExamRecord)
.options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject))
.join(ExamRecord.student)
.filter(ExamRecord.id == exam_id)
.first()
)
if exam is None or exam.student.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="考试记录不存在")
return _exam_to_out(exam)
@router.patch("/exams/{exam_id}", response_model=ExamOut)
def update_exam(
exam_id: uuid.UUID,
data: ExamUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
exam = (
db.query(ExamRecord)
.options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject))
.join(ExamRecord.student)
.filter(ExamRecord.id == exam_id)
.first()
)
if exam is None or exam.student.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="考试记录不存在")
if data.exam_type is not None:
exam.exam_type = data.exam_type
if data.exam_date is not None:
exam.exam_date = data.exam_date
if data.title is not None:
exam.title = data.title
if data.scores is not None:
_apply_scores(db, exam, data.scores)
db.commit()
db.refresh(exam)
exam = (
db.query(ExamRecord)
.options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject))
.filter(ExamRecord.id == exam.id)
.first()
)
return _exam_to_out(exam)
@router.delete("/exams/{exam_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_exam(
exam_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
exam = (
db.query(ExamRecord)
.join(ExamRecord.student)
.filter(ExamRecord.id == exam_id)
.first()
)
if exam is None or exam.student.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="考试记录不存在")
db.delete(exam)
db.commit()
@router.get("/students/{student_id}/scores/trend", response_model=TrendResponse)
def get_score_trend(
student_id: uuid.UUID,
subject_id: int = Query(...),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
get_student_for_user(db, student_id, current_user.id)
try:
return build_trend(db, student_id, subject_id)
except ValueError as exc:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(exc)) from exc
+54
View File
@@ -0,0 +1,54 @@
import csv
import io
import uuid
from fastapi import APIRouter, Depends
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session, joinedload
from app.core.database import get_db
from app.core.deps import get_current_user
from app.models.user import ExamRecord, SubjectScore, User
from app.services.student_access import get_student_for_user
router = APIRouter(tags=["export"])
@router.get("/students/{student_id}/scores/export")
def export_scores_csv(
student_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
student = get_student_for_user(db, student_id, current_user.id)
exams = (
db.query(ExamRecord)
.options(joinedload(ExamRecord.scores).joinedload(SubjectScore.subject))
.filter(ExamRecord.student_id == student_id)
.order_by(ExamRecord.exam_date.asc())
.all()
)
output = io.StringIO()
writer = csv.writer(output)
writer.writerow(["考试日期", "考试类型", "标题", "科目", "总分", "得分", "占比"])
type_map = {"weekly": "周考", "monthly": "月考", "final": "期末"}
for exam in exams:
for score in exam.scores:
writer.writerow([
exam.exam_date.isoformat(),
type_map.get(exam.exam_type.value, exam.exam_type.value),
exam.title or "",
score.subject.name if score.subject else "",
float(score.total_score),
float(score.obtained_score),
f"{float(score.ratio) * 100:.2f}%",
])
output.seek(0)
filename = f"{student.name}_scores.csv"
return StreamingResponse(
iter([output.getvalue().encode("utf-8-sig")]),
media_type="text/csv",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
+73
View File
@@ -0,0 +1,73 @@
import uuid
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.core.deps import get_current_user
from app.models.user import Student, User
from app.schemas import StudentCreate, StudentOut, StudentUpdate
from app.services.student_access import get_student_for_user
router = APIRouter(prefix="/students", tags=["students"])
@router.get("", response_model=list[StudentOut])
def list_students(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
return (
db.query(Student)
.filter(Student.user_id == current_user.id)
.order_by(Student.created_at.desc())
.all()
)
@router.post("", response_model=StudentOut, status_code=status.HTTP_201_CREATED)
def create_student(
data: StudentCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
student = Student(user_id=current_user.id, **data.model_dump())
db.add(student)
db.commit()
db.refresh(student)
return student
@router.get("/{student_id}", response_model=StudentOut)
def get_student(
student_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
return get_student_for_user(db, student_id, current_user.id)
@router.patch("/{student_id}", response_model=StudentOut)
def update_student(
student_id: uuid.UUID,
data: StudentUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
student = get_student_for_user(db, student_id, current_user.id)
for key, value in data.model_dump(exclude_unset=True).items():
setattr(student, key, value)
db.commit()
db.refresh(student)
return student
@router.delete("/{student_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_student(
student_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
student = get_student_for_user(db, student_id, current_user.id)
db.delete(student)
db.commit()
+17
View File
@@ -0,0 +1,17 @@
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.core.deps import get_current_user
from app.models.user import Subject, User
from app.schemas import SubjectOut
router = APIRouter(prefix="/subjects", tags=["subjects"])
@router.get("", response_model=list[SubjectOut])
def list_subjects(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
return db.query(Subject).order_by(Subject.id).all()
+313
View File
@@ -0,0 +1,313 @@
import uuid
from pathlib import Path
from fastapi import APIRouter, BackgroundTasks, Depends, File, Form, HTTPException, Query, UploadFile, status
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session, joinedload
from app.core.config import settings
from app.core.database import SessionLocal, get_db
from app.core.deps import get_current_user
from app.models.user import Subject, User, WrongQuestion, WrongQuestionStatus
from app.schemas import WrongQuestionOut, WrongQuestionUpdate
from app.services import ocr as ocr_service
from app.services import ollama as ollama_service
from app.services.student_access import get_student_for_user
router = APIRouter(tags=["wrong_questions"])
def _wq_to_out(wq: WrongQuestion) -> WrongQuestionOut:
return WrongQuestionOut(
id=wq.id,
student_id=wq.student_id,
subject_id=wq.subject_id,
subject_name=wq.subject.name if wq.subject else None,
image_path=wq.image_path,
ocr_raw_text=wq.ocr_raw_text,
question_text=wq.question_text,
solution_text=wq.solution_text,
status=wq.status,
created_at=wq.created_at,
)
def _process_wrong_question(question_id: uuid.UUID):
db = SessionLocal()
try:
wq = (
db.query(WrongQuestion)
.options(joinedload(WrongQuestion.subject), joinedload(WrongQuestion.student))
.filter(WrongQuestion.id == question_id)
.first()
)
if wq is None:
return
image_full = Path(settings.UPLOAD_DIR) / wq.image_path
try:
ocr_text = ocr_service.run_ocr(str(image_full))
wq.ocr_raw_text = ocr_text or None
wq.status = WrongQuestionStatus.ocr_done if ocr_text else WrongQuestionStatus.failed
db.commit()
except Exception:
wq.status = WrongQuestionStatus.failed
db.commit()
return
if not ocr_text:
return
subject_name = wq.subject.name if wq.subject else "综合"
school_level = wq.student.school_level if wq.student else None
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
question_text = loop.run_until_complete(
ollama_service.format_question(subject_name, ocr_text, school_level)
)
solution_text = loop.run_until_complete(
ollama_service.generate_solution(subject_name, question_text, school_level)
)
wq.question_text = question_text
wq.solution_text = solution_text
wq.status = WrongQuestionStatus.solved
db.commit()
except Exception:
wq.status = WrongQuestionStatus.ocr_done
db.commit()
finally:
loop.close()
finally:
db.close()
@router.get("/students/{student_id}/wrong-questions", response_model=list[WrongQuestionOut])
def list_wrong_questions(
student_id: uuid.UUID,
subject_id: int | None = Query(None),
q: str | None = Query(None),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
get_student_for_user(db, student_id, current_user.id)
query = (
db.query(WrongQuestion)
.options(joinedload(WrongQuestion.subject))
.filter(WrongQuestion.student_id == student_id)
)
if subject_id is not None:
query = query.filter(WrongQuestion.subject_id == subject_id)
if q:
pattern = f"%{q}%"
query = query.filter(
(WrongQuestion.question_text.ilike(pattern))
| (WrongQuestion.solution_text.ilike(pattern))
| (WrongQuestion.ocr_raw_text.ilike(pattern))
)
items = query.order_by(WrongQuestion.created_at.desc()).all()
return [_wq_to_out(w) for w in items]
@router.post(
"/students/{student_id}/wrong-questions",
response_model=WrongQuestionOut,
status_code=status.HTTP_201_CREATED,
)
async def upload_wrong_question(
student_id: uuid.UUID,
background_tasks: BackgroundTasks,
subject_id: int = Form(...),
file: UploadFile = File(...),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
get_student_for_user(db, student_id, current_user.id)
subject = db.get(Subject, subject_id)
if subject is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="科目不存在")
content = await file.read()
if len(content) > settings.MAX_UPLOAD_SIZE:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="文件超过10MB限制")
wq = WrongQuestion(
student_id=student_id,
subject_id=subject_id,
image_path="",
status=WrongQuestionStatus.pending,
)
db.add(wq)
db.flush()
rel_path = ocr_service.save_upload_file(
str(current_user.id), str(wq.id), file.filename or "image.jpg", content
)
wq.image_path = rel_path
db.commit()
db.refresh(wq)
wq = (
db.query(WrongQuestion)
.options(joinedload(WrongQuestion.subject))
.filter(WrongQuestion.id == wq.id)
.first()
)
background_tasks.add_task(_process_wrong_question, wq.id)
return _wq_to_out(wq)
@router.get("/wrong-questions/{question_id}", response_model=WrongQuestionOut)
def get_wrong_question(
question_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
wq = (
db.query(WrongQuestion)
.options(joinedload(WrongQuestion.subject), joinedload(WrongQuestion.student))
.filter(WrongQuestion.id == question_id)
.first()
)
if wq is None or wq.student.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="错题不存在")
return _wq_to_out(wq)
@router.patch("/wrong-questions/{question_id}", response_model=WrongQuestionOut)
def update_wrong_question(
question_id: uuid.UUID,
data: WrongQuestionUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
wq = (
db.query(WrongQuestion)
.options(joinedload(WrongQuestion.subject), joinedload(WrongQuestion.student))
.filter(WrongQuestion.id == question_id)
.first()
)
if wq is None or wq.student.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="错题不存在")
if data.subject_id is not None:
if db.get(Subject, data.subject_id) is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="科目不存在")
wq.subject_id = data.subject_id
if data.question_text is not None:
wq.question_text = data.question_text
if data.solution_text is not None:
wq.solution_text = data.solution_text
db.commit()
db.refresh(wq)
return _wq_to_out(wq)
@router.delete("/wrong-questions/{question_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_wrong_question(
question_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
wq = (
db.query(WrongQuestion)
.options(joinedload(WrongQuestion.student))
.filter(WrongQuestion.id == question_id)
.first()
)
if wq is None or wq.student.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="错题不存在")
image_full = Path(settings.UPLOAD_DIR) / wq.image_path
db.delete(wq)
db.commit()
if image_full.exists():
image_full.unlink()
@router.post("/wrong-questions/{question_id}/retry-ocr", response_model=WrongQuestionOut)
def retry_ocr(
question_id: uuid.UUID,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
wq = (
db.query(WrongQuestion)
.options(joinedload(WrongQuestion.subject), joinedload(WrongQuestion.student))
.filter(WrongQuestion.id == question_id)
.first()
)
if wq is None or wq.student.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="错题不存在")
wq.status = WrongQuestionStatus.pending
db.commit()
background_tasks.add_task(_process_wrong_question, wq.id)
return _wq_to_out(wq)
@router.post("/wrong-questions/{question_id}/regenerate-solution", response_model=WrongQuestionOut)
async def regenerate_solution(
question_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
wq = (
db.query(WrongQuestion)
.options(joinedload(WrongQuestion.subject), joinedload(WrongQuestion.student))
.filter(WrongQuestion.id == question_id)
.first()
)
if wq is None or wq.student.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="错题不存在")
if not wq.question_text and not wq.ocr_raw_text:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="缺少题目内容")
subject_name = wq.subject.name if wq.subject else "综合"
school_level = wq.student.school_level if wq.student else None
question_text = wq.question_text or wq.ocr_raw_text or ""
try:
if not wq.question_text and wq.ocr_raw_text:
wq.question_text = await ollama_service.format_question(
subject_name, wq.ocr_raw_text, school_level
)
question_text = wq.question_text
wq.solution_text = await ollama_service.generate_solution(
subject_name, question_text, school_level
)
wq.status = WrongQuestionStatus.solved
except Exception as exc:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Ollama 调用失败: {exc}"
) from exc
db.commit()
db.refresh(wq)
return _wq_to_out(wq)
@router.get("/wrong-questions/{question_id}/image")
def get_wrong_question_image(
question_id: uuid.UUID,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
wq = (
db.query(WrongQuestion)
.options(joinedload(WrongQuestion.student))
.filter(WrongQuestion.id == question_id)
.first()
)
if wq is None or wq.student.user_id != current_user.id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="错题不存在")
image_full = Path(settings.UPLOAD_DIR) / wq.image_path
if not image_full.exists():
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="图片不存在")
return FileResponse(image_full)
+183
View File
@@ -0,0 +1,183 @@
from datetime import date, datetime
from enum import Enum
from uuid import UUID
from pydantic import BaseModel, Field, field_validator
class ExamTypeEnum(str, Enum):
weekly = "weekly"
monthly = "monthly"
final = "final"
class WrongQuestionStatusEnum(str, Enum):
pending = "pending"
ocr_done = "ocr_done"
solved = "solved"
failed = "failed"
class SchoolLevelEnum(str, Enum):
junior_high = "junior_high"
senior_high = "senior_high"
class TokenResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
class UserRegister(BaseModel):
username: str = Field(min_length=3, max_length=64)
password: str = Field(min_length=6, max_length=128)
class UserLogin(BaseModel):
username: str
password: str
class RefreshRequest(BaseModel):
refresh_token: str
class UserOut(BaseModel):
id: UUID
username: str
created_at: datetime
model_config = {"from_attributes": True}
class StudentCreate(BaseModel):
name: str = Field(min_length=1, max_length=64)
school_level: SchoolLevelEnum = SchoolLevelEnum.junior_high
grade: str | None = None
class_name: str | None = None
class StudentUpdate(BaseModel):
name: str | None = None
school_level: SchoolLevelEnum | None = None
grade: str | None = None
class_name: str | None = None
class StudentOut(BaseModel):
id: UUID
name: str
school_level: SchoolLevelEnum
grade: str | None
class_name: str | None
created_at: datetime
model_config = {"from_attributes": True}
class SubjectOut(BaseModel):
id: int
name: str
model_config = {"from_attributes": True}
class ScoreInput(BaseModel):
subject_id: int
total_score: float
obtained_score: float
@field_validator("total_score")
@classmethod
def validate_total(cls, v: float) -> float:
if v <= 0:
raise ValueError("总分必须大于0")
return v
@field_validator("obtained_score")
@classmethod
def validate_obtained(cls, v: float, info) -> float:
total = info.data.get("total_score")
if total is not None and v > total:
raise ValueError("得分不能大于总分")
if v < 0:
raise ValueError("得分不能为负")
return v
class ScoreOut(BaseModel):
id: UUID
subject_id: int
subject_name: str | None = None
total_score: float
obtained_score: float
ratio: float
model_config = {"from_attributes": True}
class ExamCreate(BaseModel):
exam_type: ExamTypeEnum
exam_date: date
title: str | None = None
scores: list[ScoreInput] = []
class ExamUpdate(BaseModel):
exam_type: ExamTypeEnum | None = None
exam_date: date | None = None
title: str | None = None
scores: list[ScoreInput] | None = None
class ExamOut(BaseModel):
id: UUID
exam_type: ExamTypeEnum
exam_date: date
title: str | None
created_at: datetime
scores: list[ScoreOut] = []
model_config = {"from_attributes": True}
class TrendPoint(BaseModel):
exam_id: UUID
exam_type: ExamTypeEnum
exam_date: date
title: str | None
ratio: float
ratio_percent: float
delta: float | None = None
delta_percent: float | None = None
is_volatile: bool = False
direction: str | None = None
class TrendResponse(BaseModel):
subject_id: int
subject_name: str
threshold: float
points: list[TrendPoint]
class WrongQuestionOut(BaseModel):
id: UUID
student_id: UUID
subject_id: int
subject_name: str | None = None
image_path: str
ocr_raw_text: str | None
question_text: str | None
solution_text: str | None
status: WrongQuestionStatusEnum
created_at: datetime
model_config = {"from_attributes": True}
class WrongQuestionUpdate(BaseModel):
question_text: str | None = None
solution_text: str | None = None
subject_id: int | None = None
View File
+20
View File
@@ -0,0 +1,20 @@
from sqlalchemy import inspect, text
from app.core.database import engine
def run_migrations() -> None:
"""Apply lightweight schema updates for existing databases."""
inspector = inspect(engine)
if "students" not in inspector.get_table_names():
return
columns = {col["name"] for col in inspector.get_columns("students")}
if "school_level" not in columns:
with engine.begin() as conn:
conn.execute(
text(
"ALTER TABLE students ADD COLUMN school_level VARCHAR(32) "
"NOT NULL DEFAULT 'junior_high'"
)
)
+40
View File
@@ -0,0 +1,40 @@
from pathlib import Path
from app.core.config import settings
_ocr_engine = None
def get_ocr_engine():
global _ocr_engine
if _ocr_engine is None:
from paddleocr import PaddleOCR
_ocr_engine = PaddleOCR(use_angle_cls=True, lang="ch", show_log=False)
return _ocr_engine
def run_ocr(image_path: str) -> str:
engine = get_ocr_engine()
result = engine.ocr(image_path, cls=True)
if not result or not result[0]:
return ""
lines = []
for line in result[0]:
if line and len(line) >= 2:
text = line[1][0]
if text:
lines.append(text)
return "\n".join(lines)
def save_upload_file(user_id: str, question_id: str, filename: str, content: bytes) -> str:
ext = Path(filename).suffix.lower() or ".jpg"
if ext not in {".jpg", ".jpeg", ".png", ".webp"}:
ext = ".jpg"
user_dir = Path(settings.UPLOAD_DIR) / user_id
user_dir.mkdir(parents=True, exist_ok=True)
rel_path = f"{user_id}/{question_id}{ext}"
full_path = Path(settings.UPLOAD_DIR) / rel_path
full_path.write_bytes(content)
return rel_path
+47
View File
@@ -0,0 +1,47 @@
import httpx
from app.core.config import settings
from app.services.school_level import school_level_label
QUESTION_PROMPT = """你是一位{stage}老师。以下是从试卷 OCR 识别出的文字,可能含有噪声。
科目:{subject}
请整理出清晰的题目内容(保留题号、选项、公式),只输出题目正文,不要解释。
OCR 原文:
{ocr_text}
"""
SOLUTION_PROMPT = """你是一位耐心的{stage}{subject}老师。请为以下题目给出详细解法。
要求:步骤清晰,语言适合{stage}学生理解,指出考点和易错点。
题目:
{question_text}
"""
async def ollama_generate(prompt: str) -> str:
url = f"{settings.OLLAMA_BASE_URL.rstrip('/')}/api/generate"
payload = {
"model": settings.OLLAMA_MODEL,
"prompt": prompt,
"stream": False,
}
async with httpx.AsyncClient(timeout=120.0) as client:
response = await client.post(url, json=payload)
response.raise_for_status()
data = response.json()
return (data.get("response") or "").strip()
async def format_question(subject: str, ocr_text: str, school_level=None) -> str:
stage = school_level_label(school_level)
prompt = QUESTION_PROMPT.format(stage=stage, subject=subject, ocr_text=ocr_text)
return await ollama_generate(prompt)
async def generate_solution(subject: str, question_text: str, school_level=None) -> str:
stage = school_level_label(school_level)
prompt = SOLUTION_PROMPT.format(
stage=stage, subject=subject, question_text=question_text
)
return await ollama_generate(prompt)
+17
View File
@@ -0,0 +1,17 @@
from app.models.user import SchoolLevel
SCHOOL_LEVEL_LABELS: dict[SchoolLevel, str] = {
SchoolLevel.junior_high: "初中",
SchoolLevel.senior_high: "高中",
}
def school_level_label(level: SchoolLevel | str | None) -> str:
if level is None:
return "初中"
if isinstance(level, str):
try:
level = SchoolLevel(level)
except ValueError:
return "初中"
return SCHOOL_LEVEL_LABELS.get(level, "初中")
+66
View File
@@ -0,0 +1,66 @@
from sqlalchemy.orm import Session, joinedload
from app.core.config import settings
from app.models.user import ExamRecord, Subject, SubjectScore
from app.schemas import ExamTypeEnum, TrendPoint, TrendResponse
def build_trend(db: Session, student_id, subject_id: int) -> TrendResponse:
subject = db.get(Subject, subject_id)
if subject is None:
raise ValueError("科目不存在")
scores = (
db.query(SubjectScore)
.join(ExamRecord)
.options(joinedload(SubjectScore.exam_record))
.filter(
ExamRecord.student_id == student_id,
SubjectScore.subject_id == subject_id,
)
.order_by(ExamRecord.exam_date.asc(), ExamRecord.created_at.asc())
.all()
)
threshold = settings.FLUCTUATION_THRESHOLD
points: list[TrendPoint] = []
prev_ratio: float | None = None
for score in scores:
exam = score.exam_record
ratio = float(score.ratio)
delta = None if prev_ratio is None else ratio - prev_ratio
direction = None
is_volatile = False
if delta is not None:
if delta > 0:
direction = "up"
elif delta < 0:
direction = "down"
else:
direction = "flat"
is_volatile = abs(delta) >= threshold
points.append(
TrendPoint(
exam_id=exam.id,
exam_type=ExamTypeEnum(exam.exam_type.value),
exam_date=exam.exam_date,
title=exam.title,
ratio=ratio,
ratio_percent=round(ratio * 100, 2),
delta=delta,
delta_percent=round(delta * 100, 2) if delta is not None else None,
is_volatile=is_volatile,
direction=direction,
)
)
prev_ratio = ratio
return TrendResponse(
subject_id=subject_id,
subject_name=subject.name,
threshold=threshold,
points=points,
)
+23
View File
@@ -0,0 +1,23 @@
from sqlalchemy.orm import Session
from app.models.user import Subject
DEFAULT_SUBJECTS = [
"语文",
"数学",
"英语",
"物理",
"化学",
"生物",
"历史",
"地理",
"政治",
]
def seed_subjects(db: Session) -> None:
existing = {s.name for s in db.query(Subject).all()}
for name in DEFAULT_SUBJECTS:
if name not in existing:
db.add(Subject(name=name))
db.commit()
+13
View File
@@ -0,0 +1,13 @@
import uuid
from fastapi import HTTPException, status
from sqlalchemy.orm import Session
from app.models.user import Student
def get_student_for_user(db: Session, student_id: uuid.UUID, user_id: uuid.UUID) -> Student:
student = db.get(Student, student_id)
if student is None or student.user_id != user_id:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="学生不存在")
return student
+16
View File
@@ -0,0 +1,16 @@
fastapi==0.115.6
uvicorn[standard]==0.34.0
sqlalchemy==2.0.36
psycopg2-binary==2.9.10
alembic==1.14.0
pydantic==2.10.3
pydantic-settings==2.6.1
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
bcrypt==4.2.1
python-multipart==0.0.20
httpx==0.28.1
paddleocr==2.9.1
paddlepaddle==2.6.2
Pillow==11.0.0
aiofiles==24.1.0