Files
cloud-browser/app/browser_manager.py

285 lines
9.2 KiB
Python

import asyncio
import base64
import time
import uuid
from dataclasses import dataclass, field
from typing import Any, Optional
from playwright.async_api import Browser, BrowserContext, Page, Playwright, async_playwright
from app.security import (
get_idle_timeout,
get_screencast_max_fps,
get_screencast_nth_frame,
get_screencast_quality,
get_viewport_size,
)
@dataclass
class BrowserSession:
session_id: str
url: str
playwright: Playwright
browser: Browser
context: BrowserContext
page: Page
cdp: Any
created_at: float = field(default_factory=time.time)
last_activity: float = field(default_factory=time.time)
subscribers: set[asyncio.Queue] = field(default_factory=set)
screencast_task: Optional[asyncio.Task] = None
idle_task: Optional[asyncio.Task] = None
closed: bool = False
viewport_width: int = 1024
viewport_height: int = 576
last_frame_sent_at: float = 0.0
class BrowserManager:
def __init__(self, max_sessions: int = 1) -> None:
self.max_sessions = max_sessions
self._sessions: dict[str, BrowserSession] = {}
self._lock = asyncio.Lock()
async def create_session(self, url: str) -> BrowserSession:
async with self._lock:
if len(self._sessions) >= self.max_sessions:
raise RuntimeError(
f"已达最大会话数 ({self.max_sessions}),请先关闭现有会话"
)
session_id = str(uuid.uuid4())
width, height = get_viewport_size()
quality = get_screencast_quality()
playwright = await async_playwright().start()
browser = await playwright.chromium.launch(
headless=True,
args=[
"--no-sandbox",
"--disable-setuid-sandbox",
"--disable-dev-shm-usage",
],
)
context = await browser.new_context(
viewport={"width": width, "height": height},
user_agent=(
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
),
)
page = await context.new_page()
cdp = await context.new_cdp_session(page)
session = BrowserSession(
session_id=session_id,
url=url,
playwright=playwright,
browser=browser,
context=context,
page=page,
cdp=cdp,
viewport_width=width,
viewport_height=height,
)
await page.goto(url, wait_until="domcontentloaded", timeout=60000)
session.url = page.url
page.on(
"framenavigated",
lambda frame: asyncio.create_task(
self._on_frame_navigated(session, frame)
),
)
session.screencast_task = asyncio.create_task(
self._run_screencast(session, quality)
)
session.idle_task = asyncio.create_task(self._watch_idle(session))
self._sessions[session_id] = session
return session
def session_count(self) -> int:
return len(self._sessions)
async def get_session(self, session_id: str) -> Optional[BrowserSession]:
return self._sessions.get(session_id)
async def close_session(self, session_id: str) -> None:
async with self._lock:
session = self._sessions.pop(session_id, None)
if session:
await self._cleanup_session(session)
async def close_all(self) -> None:
async with self._lock:
session_ids = list(self._sessions.keys())
for session_id in session_ids:
await self.close_session(session_id)
def touch(self, session: BrowserSession) -> None:
session.last_activity = time.time()
async def navigate(self, session: BrowserSession, url: str) -> str:
self.touch(session)
await session.page.goto(url, wait_until="domcontentloaded", timeout=60000)
session.url = session.page.url
return session.url
async def go_back(self, session: BrowserSession) -> str:
self.touch(session)
await session.page.go_back(wait_until="domcontentloaded", timeout=30000)
session.url = session.page.url
return session.url
async def go_forward(self, session: BrowserSession) -> str:
self.touch(session)
await session.page.go_forward(wait_until="domcontentloaded", timeout=30000)
session.url = session.page.url
return session.url
async def reload(self, session: BrowserSession) -> str:
self.touch(session)
await session.page.reload(wait_until="domcontentloaded", timeout=60000)
session.url = session.page.url
return session.url
def subscribe(self, session: BrowserSession) -> asyncio.Queue:
queue: asyncio.Queue = asyncio.Queue(maxsize=2)
session.subscribers.add(queue)
return queue
def unsubscribe(self, session: BrowserSession, queue: asyncio.Queue) -> None:
session.subscribers.discard(queue)
async def _broadcast(self, session: BrowserSession, message: dict) -> None:
dead: list[asyncio.Queue] = []
for queue in session.subscribers:
try:
queue.put_nowait(message)
except asyncio.QueueFull:
try:
queue.get_nowait()
queue.put_nowait(message)
except asyncio.QueueEmpty:
pass
except Exception:
dead.append(queue)
for queue in dead:
session.subscribers.discard(queue)
async def _on_frame_navigated(self, session: BrowserSession, frame) -> None:
if session.closed or frame != session.page.main_frame:
return
session.url = session.page.url
await self._broadcast(session, {"type": "url_update", "url": session.url})
async def _run_screencast(self, session: BrowserSession, quality: int) -> None:
nth_frame = get_screencast_nth_frame()
min_frame_interval = 1.0 / get_screencast_max_fps()
async def on_screencast_frame(params: dict) -> None:
if session.closed:
return
session_id = params.get("sessionId")
try:
await session.cdp.send(
"Page.screencastFrameAck", {"sessionId": session_id}
)
except Exception:
return
now = time.time()
if now - session.last_frame_sent_at < min_frame_interval:
return
data = params.get("data", "")
try:
frame_bytes = base64.b64decode(data)
except Exception:
return
session.last_frame_sent_at = now
await self._broadcast(
session,
{
"type": "frame",
"data": frame_bytes,
"url": session.url,
"width": session.viewport_width,
"height": session.viewport_height,
},
)
def schedule_frame(params: dict) -> None:
asyncio.create_task(on_screencast_frame(params))
session.cdp.on("Page.screencastFrame", schedule_frame)
await session.cdp.send(
"Page.startScreencast",
{
"format": "jpeg",
"quality": quality,
"maxWidth": session.viewport_width,
"maxHeight": session.viewport_height,
"everyNthFrame": nth_frame,
},
)
try:
while not session.closed:
await asyncio.sleep(1)
finally:
try:
await session.cdp.send("Page.stopScreencast")
except Exception:
pass
async def _watch_idle(self, session: BrowserSession) -> None:
timeout = get_idle_timeout()
try:
while not session.closed:
await asyncio.sleep(30)
if time.time() - session.last_activity > timeout:
await self.close_session(session.session_id)
break
except asyncio.CancelledError:
pass
async def _cleanup_session(self, session: BrowserSession) -> None:
if session.closed:
return
session.closed = True
for task in (session.screencast_task, session.idle_task):
if task and not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
await self._broadcast(session, {"type": "closed", "reason": "session_ended"})
try:
await session.context.close()
except Exception:
pass
try:
await session.browser.close()
except Exception:
pass
try:
await session.playwright.stop()
except Exception:
pass
browser_manager = BrowserManager()