import asyncio import base64 import time import uuid from dataclasses import dataclass, field from typing import Any, Optional from playwright.async_api import Browser, BrowserContext, Page, Playwright, async_playwright from app.security import ( get_idle_timeout, get_screencast_max_fps, get_screencast_nth_frame, get_screencast_quality, get_viewport_size, ) @dataclass class BrowserSession: session_id: str url: str playwright: Playwright browser: Browser context: BrowserContext page: Page cdp: Any created_at: float = field(default_factory=time.time) last_activity: float = field(default_factory=time.time) subscribers: set[asyncio.Queue] = field(default_factory=set) screencast_task: Optional[asyncio.Task] = None idle_task: Optional[asyncio.Task] = None closed: bool = False viewport_width: int = 1024 viewport_height: int = 576 last_frame_sent_at: float = 0.0 class BrowserManager: def __init__(self, max_sessions: int = 1) -> None: self.max_sessions = max_sessions self._sessions: dict[str, BrowserSession] = {} self._lock = asyncio.Lock() async def create_session(self, url: str) -> BrowserSession: async with self._lock: if len(self._sessions) >= self.max_sessions: raise RuntimeError( f"已达最大会话数 ({self.max_sessions}),请先关闭现有会话" ) session_id = str(uuid.uuid4()) width, height = get_viewport_size() quality = get_screencast_quality() playwright = await async_playwright().start() browser = await playwright.chromium.launch( headless=True, args=[ "--no-sandbox", "--disable-setuid-sandbox", "--disable-dev-shm-usage", ], ) context = await browser.new_context( viewport={"width": width, "height": height}, user_agent=( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/131.0.0.0 Safari/537.36" ), ) page = await context.new_page() cdp = await context.new_cdp_session(page) session = BrowserSession( session_id=session_id, url=url, playwright=playwright, browser=browser, context=context, page=page, cdp=cdp, viewport_width=width, viewport_height=height, ) await page.goto(url, wait_until="domcontentloaded", timeout=60000) session.url = page.url page.on( "framenavigated", lambda frame: asyncio.create_task( self._on_frame_navigated(session, frame) ), ) session.screencast_task = asyncio.create_task( self._run_screencast(session, quality) ) session.idle_task = asyncio.create_task(self._watch_idle(session)) self._sessions[session_id] = session return session def session_count(self) -> int: return len(self._sessions) async def get_session(self, session_id: str) -> Optional[BrowserSession]: return self._sessions.get(session_id) async def close_session(self, session_id: str) -> None: async with self._lock: session = self._sessions.pop(session_id, None) if session: await self._cleanup_session(session) async def close_all(self) -> None: async with self._lock: session_ids = list(self._sessions.keys()) for session_id in session_ids: await self.close_session(session_id) def touch(self, session: BrowserSession) -> None: session.last_activity = time.time() async def navigate(self, session: BrowserSession, url: str) -> str: self.touch(session) await session.page.goto(url, wait_until="domcontentloaded", timeout=60000) session.url = session.page.url return session.url async def go_back(self, session: BrowserSession) -> str: self.touch(session) await session.page.go_back(wait_until="domcontentloaded", timeout=30000) session.url = session.page.url return session.url async def go_forward(self, session: BrowserSession) -> str: self.touch(session) await session.page.go_forward(wait_until="domcontentloaded", timeout=30000) session.url = session.page.url return session.url async def reload(self, session: BrowserSession) -> str: self.touch(session) await session.page.reload(wait_until="domcontentloaded", timeout=60000) session.url = session.page.url return session.url def subscribe(self, session: BrowserSession) -> asyncio.Queue: queue: asyncio.Queue = asyncio.Queue(maxsize=2) session.subscribers.add(queue) return queue def unsubscribe(self, session: BrowserSession, queue: asyncio.Queue) -> None: session.subscribers.discard(queue) async def _broadcast(self, session: BrowserSession, message: dict) -> None: dead: list[asyncio.Queue] = [] for queue in session.subscribers: try: queue.put_nowait(message) except asyncio.QueueFull: try: queue.get_nowait() queue.put_nowait(message) except asyncio.QueueEmpty: pass except Exception: dead.append(queue) for queue in dead: session.subscribers.discard(queue) async def _on_frame_navigated(self, session: BrowserSession, frame) -> None: if session.closed or frame != session.page.main_frame: return session.url = session.page.url await self._broadcast(session, {"type": "url_update", "url": session.url}) async def _run_screencast(self, session: BrowserSession, quality: int) -> None: nth_frame = get_screencast_nth_frame() min_frame_interval = 1.0 / get_screencast_max_fps() async def on_screencast_frame(params: dict) -> None: if session.closed: return session_id = params.get("sessionId") try: await session.cdp.send( "Page.screencastFrameAck", {"sessionId": session_id} ) except Exception: return now = time.time() if now - session.last_frame_sent_at < min_frame_interval: return data = params.get("data", "") try: frame_bytes = base64.b64decode(data) except Exception: return session.last_frame_sent_at = now await self._broadcast( session, { "type": "frame", "data": frame_bytes, "url": session.url, "width": session.viewport_width, "height": session.viewport_height, }, ) def schedule_frame(params: dict) -> None: asyncio.create_task(on_screencast_frame(params)) session.cdp.on("Page.screencastFrame", schedule_frame) await session.cdp.send( "Page.startScreencast", { "format": "jpeg", "quality": quality, "maxWidth": session.viewport_width, "maxHeight": session.viewport_height, "everyNthFrame": nth_frame, }, ) try: while not session.closed: await asyncio.sleep(1) finally: try: await session.cdp.send("Page.stopScreencast") except Exception: pass async def _watch_idle(self, session: BrowserSession) -> None: timeout = get_idle_timeout() try: while not session.closed: await asyncio.sleep(30) if time.time() - session.last_activity > timeout: await self.close_session(session.session_id) break except asyncio.CancelledError: pass async def _cleanup_session(self, session: BrowserSession) -> None: if session.closed: return session.closed = True for task in (session.screencast_task, session.idle_task): if task and not task.done(): task.cancel() try: await task except asyncio.CancelledError: pass await self._broadcast(session, {"type": "closed", "reason": "session_ended"}) try: await session.context.close() except Exception: pass try: await session.browser.close() except Exception: pass try: await session.playwright.stop() except Exception: pass browser_manager = BrowserManager()