import asyncio import json from contextlib import asynccontextmanager from pathlib import Path from typing import Optional from fastapi import Cookie, Depends, FastAPI, HTTPException, WebSocket, WebSocketDisconnect from fastapi.responses import FileResponse, HTMLResponse, JSONResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field from app.auth import auth_manager from app.browser_manager import browser_manager from app.input_handler import handle_input from app.security import SecurityError, get_max_sessions, validate_url STATIC_DIR = Path(__file__).resolve().parent.parent / "static" SESSION_COOKIE = "cloud_browser_token" class CreateSessionRequest(BaseModel): url: str = Field(..., min_length=1, max_length=2048) class CreateSessionResponse(BaseModel): session_id: str url: str class NavigateRequest(BaseModel): url: str = Field(..., min_length=1, max_length=2048) class LoginRequest(BaseModel): username: str = Field(..., min_length=1, max_length=64) password: str = Field(..., min_length=1, max_length=128) class ChangeCredentialsRequest(BaseModel): current_username: str = Field(..., min_length=1, max_length=64) current_password: str = Field(..., min_length=1, max_length=128) new_username: str = Field(..., min_length=2, max_length=64) new_password: str = Field(..., min_length=4, max_length=128) def get_current_user(token: Optional[str] = Cookie(None, alias=SESSION_COOKIE)) -> str: username = auth_manager.verify_token(token or "") if not username: raise HTTPException(status_code=401, detail="未登录或登录已过期") return username def _set_auth_cookie(response: JSONResponse, token: str) -> JSONResponse: response.set_cookie( key=SESSION_COOKIE, value=token, httponly=True, samesite="lax", max_age=60 * 60 * 24 * 7, path="/", ) return response @asynccontextmanager async def lifespan(app: FastAPI): browser_manager.max_sessions = get_max_sessions() yield await browser_manager.close_all() app = FastAPI(title="Cloud Browser", lifespan=lifespan) app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static") @app.get("/", response_class=HTMLResponse) async def index(): return FileResponse(STATIC_DIR / "index.html") @app.get("/view/{session_id}", response_class=HTMLResponse) async def view_page(session_id: str): session = await browser_manager.get_session(session_id) if not session: raise HTTPException(status_code=404, detail="会话不存在或已过期") return FileResponse(STATIC_DIR / "viewer.html") @app.get("/api/health") async def health(): return {"status": "ok", "sessions": browser_manager.session_count} @app.get("/api/auth/me") async def auth_me(user: str = Depends(get_current_user)): return {"username": user} @app.post("/api/auth/login") async def auth_login(body: LoginRequest): if not auth_manager.authenticate(body.username, body.password): raise HTTPException(status_code=401, detail="用户名或密码错误") token = auth_manager.create_token(body.username) response = JSONResponse({"username": body.username, "message": "登录成功"}) return _set_auth_cookie(response, token) @app.post("/api/auth/logout") async def auth_logout(): response = JSONResponse({"message": "已退出登录"}) response.delete_cookie(SESSION_COOKIE, path="/") return response @app.post("/api/auth/change-credentials") async def change_credentials( body: ChangeCredentialsRequest, user: str = Depends(get_current_user), ): if body.current_username != user: raise HTTPException(status_code=403, detail="当前用户名与登录账号不一致") try: auth_manager.change_credentials( body.current_username, body.current_password, body.new_username, body.new_password, ) except ValueError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc token = auth_manager.create_token(body.new_username) response = JSONResponse({"username": body.new_username, "message": "账号已更新"}) return _set_auth_cookie(response, token) @app.post("/api/session", response_model=CreateSessionResponse) async def create_session(body: CreateSessionRequest, user: str = Depends(get_current_user)): try: url = validate_url(body.url) except SecurityError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc try: session = await browser_manager.create_session(url) except RuntimeError as exc: raise HTTPException(status_code=429, detail=str(exc)) from exc except Exception as exc: raise HTTPException(status_code=500, detail=f"创建会话失败: {exc}") from exc return CreateSessionResponse(session_id=session.session_id, url=session.url) @app.delete("/api/session/{session_id}") async def delete_session(session_id: str, user: str = Depends(get_current_user)): session = await browser_manager.get_session(session_id) if not session: raise HTTPException(status_code=404, detail="会话不存在") await browser_manager.close_session(session_id) return {"status": "closed"} @app.post("/api/session/{session_id}/navigate") async def navigate_session( session_id: str, body: NavigateRequest, user: str = Depends(get_current_user), ): session = await browser_manager.get_session(session_id) if not session: raise HTTPException(status_code=404, detail="会话不存在") try: url = validate_url(body.url) except SecurityError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc try: current_url = await browser_manager.navigate(session, url) except Exception as exc: raise HTTPException(status_code=500, detail=f"导航失败: {exc}") from exc return {"url": current_url} @app.post("/api/session/{session_id}/back") async def go_back(session_id: str, user: str = Depends(get_current_user)): session = await browser_manager.get_session(session_id) if not session: raise HTTPException(status_code=404, detail="会话不存在") try: url = await browser_manager.go_back(session) except Exception as exc: raise HTTPException(status_code=400, detail=f"无法后退: {exc}") from exc return {"url": url} @app.post("/api/session/{session_id}/forward") async def go_forward(session_id: str, user: str = Depends(get_current_user)): session = await browser_manager.get_session(session_id) if not session: raise HTTPException(status_code=404, detail="会话不存在") try: url = await browser_manager.go_forward(session) except Exception as exc: raise HTTPException(status_code=400, detail=f"无法前进: {exc}") from exc return {"url": url} @app.post("/api/session/{session_id}/reload") async def reload_page(session_id: str, user: str = Depends(get_current_user)): session = await browser_manager.get_session(session_id) if not session: raise HTTPException(status_code=404, detail="会话不存在") try: url = await browser_manager.reload(session) except Exception as exc: raise HTTPException(status_code=500, detail=f"刷新失败: {exc}") from exc return {"url": url} @app.websocket("/ws/{session_id}") async def websocket_stream( websocket: WebSocket, session_id: str, cloud_browser_token: Optional[str] = Cookie(None), ): if not auth_manager.verify_token(cloud_browser_token or ""): await websocket.close(code=4401, reason="未登录") return session = await browser_manager.get_session(session_id) if not session: await websocket.close(code=4404, reason="会话不存在") return await websocket.accept() queue = browser_manager.subscribe(session) await websocket.send_json( { "type": "init", "url": session.url, "width": session.viewport_width, "height": session.viewport_height, } ) async def forward_frames(): while not session.closed: try: message = await asyncio.wait_for(queue.get(), timeout=30) except asyncio.TimeoutError: continue if message.get("type") == "frame": await websocket.send_bytes(message["data"]) if message.get("url") and message["url"] != session.url: await websocket.send_json({"type": "url", "url": message["url"]}) elif message.get("type") == "closed": await websocket.send_json(message) break forward_task = asyncio.create_task(forward_frames()) try: while True: raw = await websocket.receive_text() try: payload = json.loads(raw) except json.JSONDecodeError: await websocket.send_json({"type": "error", "message": "无效 JSON"}) continue action_type = payload.get("type", "input") if action_type == "ping": browser_manager.touch(session) await websocket.send_json({"type": "pong"}) continue if action_type == "navigate": try: url = validate_url(payload.get("url", "")) current_url = await browser_manager.navigate(session, url) await websocket.send_json({"type": "url", "url": current_url}) except SecurityError as exc: await websocket.send_json({"type": "error", "message": str(exc)}) except Exception as exc: await websocket.send_json( {"type": "error", "message": f"导航失败: {exc}"} ) continue result = await handle_input(browser_manager, session, payload) if result: await websocket.send_json(result) except WebSocketDisconnect: pass finally: forward_task.cancel() try: await forward_task except asyncio.CancelledError: pass browser_manager.unsubscribe(session, queue)