""" 多账户交易中控:监控区 / 系统设置。 聚合各实例监控数据与子代理 /status;下单请在各实例网页操作。 """ from __future__ import annotations import asyncio import os import sys from pathlib import Path _REPO_ROOT = Path(__file__).resolve().parent.parent if str(_REPO_ROOT) not in sys.path: sys.path.insert(0, str(_REPO_ROOT)) from env_load import load_hub_dotenv load_hub_dotenv() import httpx from fastapi import Body, FastAPI, HTTPException, Request from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field from settings_store import ( enabled_exchanges, env_force_disabled_ids, load_settings, save_settings, ) from hub_web_auth import ( SESSION_COOKIE, SESSION_MAX_AGE_SEC, clear_session_cookie, cookie_secure_for_request, create_session_token, embed_allowed, embed_frame_ancestors, is_public_path, password_required, set_session_cookie, validate_session_token, expected_username, verify_credentials, ) from hub_sso import HUB_SSO_TTL_SEC, mint_hub_sso_token, safe_next_path from url_public import browser_url, default_review_url, public_origin from urllib.parse import urlencode try: from exchange_orders import symbols_match as _symbols_match except ImportError: def _symbols_match(position_symbol: str, order_symbol: str) -> bool: a = (position_symbol or "").strip().upper() b = (order_symbol or "").strip().upper() return bool(a and b and a == b) HUB_HOST = os.getenv("HUB_HOST", "0.0.0.0") HUB_PORT = int(os.getenv("HUB_PORT", "5100")) HUB_BRIDGE_TOKEN = (os.getenv("HUB_BRIDGE_TOKEN") or os.getenv("CONTROL_TOKEN") or "").strip() _trust_raw = (os.getenv("HUB_TRUST_LAN", "true") or "").strip().lower() HUB_TRUST_LAN = _trust_raw not in ("0", "false", "no", "off") _allow_pub_raw = (os.getenv("HUB_ALLOW_PUBLIC") or "").strip().lower() # 云服务器 + 域名反代时设为 true:不做 IP 限制,仅靠 HUB_PASSWORD / 登录页保护 HUB_ALLOW_PUBLIC = _allow_pub_raw in ("1", "true", "yes", "on") DIR = Path(__file__).resolve().parent HUB_BUILD = "20260526-hub-key3col" HUB_AGENT_TIMEOUT = float(os.getenv("HUB_AGENT_TIMEOUT", "8")) HUB_FLASK_TIMEOUT = float(os.getenv("HUB_FLASK_TIMEOUT", "10")) _board_key_prices_raw = (os.getenv("HUB_BOARD_KEY_PRICES", "true") or "").strip().lower() HUB_BOARD_KEY_PRICES = _board_key_prices_raw in ("1", "true", "yes", "on") def _is_local(host: str | None) -> bool: if not host: return False h = host.lower() return h in ("127.0.0.1", "::1", "localhost") or h.startswith("::ffff:127.0.0.1") def _ipv4_rfc1918_private(host: str) -> bool: h = host.lower() if h.startswith("::ffff:"): h = h[7:] parts = h.split(".") if len(parts) != 4: return False try: a, b, c, d = (int(x) for x in parts) except ValueError: return False if any(x < 0 or x > 255 for x in (a, b, c, d)): return False if a == 10: return True if a == 172 and 16 <= b <= 31: return True if a == 192 and b == 168: return True return False def _client_allowed(host: str | None) -> bool: if _is_local(host): return True if HUB_TRUST_LAN and host and _ipv4_rfc1918_private(host): return True return False def _hub_headers() -> dict[str, str]: if not HUB_BRIDGE_TOKEN: return {} return {"X-Hub-Token": HUB_BRIDGE_TOKEN} def _agent_headers() -> dict[str, str]: if not HUB_BRIDGE_TOKEN: return {} return {"X-Control-Token": HUB_BRIDGE_TOKEN} def _find_exchange(ex_id: str) -> dict | None: for ex in load_settings().get("exchanges") or []: if str(ex.get("id")) == str(ex_id): return ex return None app = FastAPI(title="复盘系统中控", docs_url=None, redoc_url=None) STATIC_DIR = DIR / "static" if STATIC_DIR.is_dir(): app.mount("/assets", StaticFiles(directory=str(STATIC_DIR)), name="assets") @app.middleware("http") async def local_only(request: Request, call_next): if HUB_ALLOW_PUBLIC: return await call_next(request) peer = request.client.host if request.client else None if not _client_allowed(peer): return JSONResponse({"detail": "forbidden"}, status_code=403) return await call_next(request) @app.middleware("http") async def embed_frame_headers(request: Request, call_next): response = await call_next(request) if embed_allowed(): ancestors = embed_frame_ancestors() if ancestors == "*": response.headers["Content-Security-Policy"] = "frame-ancestors *" else: response.headers["Content-Security-Policy"] = f"frame-ancestors 'self' {ancestors}" return response @app.middleware("http") async def hub_password_gate(request: Request, call_next): if not password_required(): return await call_next(request) path = request.url.path if is_public_path(path, request.method): return await call_next(request) token = request.cookies.get(SESSION_COOKIE) if validate_session_token(token): return await call_next(request) if path.startswith("/api/"): return JSONResponse({"detail": "未登录", "login_required": True}, status_code=401) from fastapi.responses import RedirectResponse nxt = path if path.startswith("/") else "/monitor" return RedirectResponse(f"/login?next={nxt}", status_code=302) def _shell_page(): index = STATIC_DIR / "index.html" if not index.is_file(): return JSONResponse({"detail": "missing static/index.html"}, status_code=500) return FileResponse(index) def _login_page(): login = STATIC_DIR / "login.html" if not login.is_file(): return JSONResponse({"detail": "missing static/login.html"}, status_code=500) return FileResponse(login) class LoginBody(BaseModel): username: str = "" password: str = "" @app.get("/api/auth/status") def api_auth_status(request: Request): required = password_required() logged_in = not required or validate_session_token(request.cookies.get(SESSION_COOKIE)) return { "required": required, "logged_in": logged_in, "username_hint": expected_username() if required else None, } @app.post("/api/auth/login") def api_auth_login(body: LoginBody, request: Request): if not password_required(): return {"ok": True, "auth_disabled": True} if not verify_credentials(body.username, body.password): raise HTTPException(status_code=401, detail="用户名或密码错误") token = create_session_token(body.username) embed = (request.headers.get("x-hub-embed") or "").strip() == "1" resp = JSONResponse({"ok": True, "session_token": token, "embed": embed}) set_session_cookie(resp, request, token, embed=embed) return resp @app.get("/embed-auth") def embed_auth_login(request: Request, token: str = "", next: str = "/monitor"): """ 嵌入式打开:父页跨域 fetch 登录时 Cookie 可能写不进 iframe, 用 session_token 在本页做一次导航,在 iframe 内写入 hub_sess。 """ from fastapi.responses import RedirectResponse dest = safe_next_path(next) if not password_required(): return RedirectResponse(dest, status_code=302) if not validate_session_token(token): q = urlencode({"next": dest, "embed": "1"}) return RedirectResponse(f"/login?{q}", status_code=302) resp = RedirectResponse(dest, status_code=302) set_session_cookie(resp, request, token, embed=True) return resp @app.post("/api/auth/logout") def api_auth_logout(request: Request): embed = (request.headers.get("x-hub-embed") or "").strip() == "1" resp = JSONResponse({"ok": True}) clear_session_cookie(resp, request, embed=embed) return resp @app.get("/login") def login_page(): return _login_page() @app.get("/") def root_redirect(): from fastapi.responses import RedirectResponse return RedirectResponse("/monitor") @app.get("/monitor") @app.get("/settings") def shell_pages(): return _shell_page() @app.get("/trade") def trade_removed_redirect(): from fastapi.responses import RedirectResponse return RedirectResponse("/monitor", status_code=302) @app.get("/api/settings") def api_get_settings(): return load_settings() class SettingsBody(BaseModel): exchanges: list[dict] = Field(default_factory=list) @app.post("/api/settings") def api_save_settings(body: SettingsBody): force_off = env_force_disabled_ids() to_save = [] for ex in body.exchanges: row = dict(ex) eid = str(row.get("id", "")).strip() if eid in force_off: row["enabled"] = False row.pop("env_disabled", None) to_save.append(row) save_settings({"version": 1, "exchanges": to_save}) return {"ok": True, "settings": load_settings()} @app.get("/api/settings/meta") def api_settings_meta(): po = public_origin() return { "env_disabled_ids": sorted(env_force_disabled_ids()), "hub_bridge_token_set": bool(HUB_BRIDGE_TOKEN), "capability_options": ["key", "trend"], "public_origin": f"{po[0]}://{po[1]}" if po else None, "public_origin_hint": ( "未设置 HUB_PUBLIC_ORIGIN 时,复盘链接若为 127.0.0.1,仅服务器本机浏览器可打开" if not po else "复盘/展示链接已替换为对外地址" ), "password_required": password_required(), } async def _fetch_agent_status(client: httpx.AsyncClient, ex: dict) -> dict: url = f"{ex['agent_url'].rstrip('/')}/status" try: r = await client.get(url, headers=_agent_headers(), timeout=HUB_AGENT_TIMEOUT) body = r.json() if r.content else {} return { "id": ex["id"], "name": ex["name"], "key": ex.get("key"), "agent_url": ex["agent_url"], "flask_url": ex.get("flask_url"), "capabilities": ex.get("capabilities") or [], "http_ok": r.status_code == 200, "agent": body, "error": body.get("error") if isinstance(body, dict) else None, } except Exception as e: return { "id": ex["id"], "name": ex["name"], "key": ex.get("key"), "agent_url": ex["agent_url"], "flask_url": ex.get("flask_url"), "capabilities": ex.get("capabilities") or [], "http_ok": False, "error": str(e), "agent": None, } def _parse_http_json_body(r: httpx.Response) -> dict: text = (r.text or "").strip() if not text: return {"ok": False, "status": r.status_code, "text": "(empty body)"} try: data = r.json() if isinstance(data, dict): return data return {"ok": False, "status": r.status_code, "text": text[:500]} except Exception: snippet = text[:500] if snippet.lstrip().lower().startswith(" dict | None: base = (ex.get("flask_url") or "").rstrip("/") if not base: return None try: if method == "GET": r = await client.get(f"{base}{path}", headers=_hub_headers(), timeout=HUB_FLASK_TIMEOUT) else: headers = {**_hub_headers(), "Content-Type": "application/json"} if json_body is not None: r = await client.post( f"{base}{path}", headers=headers, json=json_body, timeout=120.0 ) else: r = await client.post( f"{base}{path}", headers=headers, data=data, timeout=120.0 ) if r.status_code >= 400: parsed = _parse_http_json_body(r) parsed.setdefault("ok", False) parsed.setdefault("status", r.status_code) return parsed return _parse_http_json_body(r) except Exception as e: return {"ok": False, "error": str(e)} def _flask_error_from_hub_mon(hub_mon: dict | None) -> str | None: if not isinstance(hub_mon, dict) or hub_mon.get("ok") is not False: return None st = hub_mon.get("status") if st == 404: return ( "HTTP 404:该 Flask 未注册 /api/hub/*(hub_bridge 未加载)。" "请在仓库根目录 git pull 后 pm2 restart crypto_binance crypto_gate crypto_gate_bot," "并查看启动日志是否含 [hub_bridge] ImportError" ) return ( hub_mon.get("msg") or hub_mon.get("error") or (f"HTTP {st}" if st else None) or (str(hub_mon.get("text") or "")[:120] or None) ) def _tpsl_slots_to_conditional_orders(exchange_tpsl: dict, symbol: str) -> list[dict]: """将实例 price_snapshot 的 exchange_tpsl 转为中控条件单结构。""" out: list[dict] = [] if not isinstance(exchange_tpsl, dict): return out for role, label in (("sl", "止损"), ("tp", "止盈")): slot = exchange_tpsl.get(role) if not isinstance(slot, dict): continue trig = slot.get("trigger_price") oid = slot.get("order_id") if trig is None or oid is None: continue try: trig_f = float(trig) except (TypeError, ValueError): continue out.append( { "id": str(oid), "symbol": symbol, "channel": "algo", "category": "conditional", "label": f"{label} {trig_f:g}", "trigger_price": trig_f, "amount": None, "status": "open", } ) return out def _find_exchange_tpsl_for_position( symbol: str, side: str, order_prices: list, hub_orders: list, ) -> dict | None: side_l = (side or "").lower() op_by_id = { op.get("id"): op for op in order_prices if isinstance(op, dict) and op.get("id") is not None } for o in hub_orders: if not isinstance(o, dict): continue o_sym = o.get("exchange_symbol") or o.get("symbol") or "" if not _symbols_match(symbol, o_sym): continue if (o.get("direction") or "").lower() != side_l: continue op = op_by_id.get(o.get("id")) if not isinstance(op, dict): continue et = op.get("exchange_tpsl") if isinstance(et, dict) and (et.get("sl") or et.get("tp")): return et for op in order_prices: if not isinstance(op, dict): continue if not _symbols_match(symbol, op.get("symbol") or ""): continue et = op.get("exchange_tpsl") if isinstance(et, dict) and (et.get("sl") or et.get("tp")): return et return None def _merge_flask_exchange_tpsl(agent_row: dict, snap: dict | None, hub_mon: dict | None) -> None: """子代理挂单为空时,用实例 Flask 已算好的 exchange_tpsl 补全展示。""" ag = agent_row.get("agent") if not isinstance(ag, dict): return positions = ag.get("positions") if not isinstance(positions, list) or not positions: return if not isinstance(snap, dict): return order_prices = snap.get("order_prices") or [] hub_orders = [] if isinstance(hub_mon, dict): hub_orders = hub_mon.get("orders") or [] for p in positions: if not isinstance(p, dict): continue sym = p.get("symbol") or "" side = p.get("side") or "" et = _find_exchange_tpsl_for_position(sym, side, order_prices, hub_orders) if not et: continue p["exchange_tpsl"] = et cond = p.get("conditional_orders") or [] if not cond: p["conditional_orders"] = _tpsl_slots_to_conditional_orders(et, sym) async def _fetch_exchange_flask_bundle( client: httpx.AsyncClient, ex: dict ) -> tuple[dict | None, dict | None, list | None, dict | None]: """单所 Flask:monitor / meta / price_snapshot(有 flask_url 时)并行拉取。""" caps = ex.get("capabilities") or [] tasks = [ _fetch_flask_json(client, ex, "/api/hub/monitor"), _fetch_flask_json(client, ex, "/api/hub/meta"), ] has_flask = bool((ex.get("flask_url") or "").strip()) if has_flask: tasks.append(_fetch_flask_json(client, ex, "/api/price_snapshot")) results = await asyncio.gather(*tasks) hub_mon = results[0] meta = results[1] snap = results[2] if has_flask and len(results) > 2 else None key_prices = None want_prices = HUB_BOARD_KEY_PRICES and "key" in caps if want_prices and isinstance(snap, dict): key_prices = snap.get("key_prices") return hub_mon, meta, key_prices, snap if isinstance(snap, dict) else None async def _assemble_board_row( client: httpx.AsyncClient, ex: dict, agent_row: dict ) -> dict: hub_mon, meta, key_prices, snap = await _fetch_exchange_flask_bundle(client, ex) _merge_flask_exchange_tpsl(agent_row, snap, hub_mon if isinstance(hub_mon, dict) else None) flask_ok = isinstance(hub_mon, dict) and hub_mon.get("ok") is not False raw_review = (ex.get("review_url") or "").strip() review_link = browser_url(raw_review) if raw_review else default_review_url( ex.get("flask_url") ) return { **agent_row, "flask_url": ex.get("flask_url") or "", "flask_url_browser": browser_url(ex.get("flask_url")), "review_url": review_link, "hub_monitor": hub_mon, "flask_ok": flask_ok, "flask_error": _flask_error_from_hub_mon(hub_mon if isinstance(hub_mon, dict) else None), "meta": (meta or {}).get("meta") if isinstance(meta, dict) else meta, "key_prices": key_prices, } @app.get("/api/monitor/board") async def api_monitor_board(): exchanges = enabled_exchanges() async with httpx.AsyncClient() as client: agent_rows = await asyncio.gather( *[_fetch_agent_status(client, ex) for ex in exchanges] ) out = await asyncio.gather( *[ _assemble_board_row(client, ex, agent_row) for ex, agent_row in zip(exchanges, agent_rows) ] ) return { "rows": list(out), "updated_at": __import__("datetime").datetime.now().isoformat(timespec="seconds"), } def _require_hub_logged_in(request: Request) -> None: if password_required() and not validate_session_token(request.cookies.get(SESSION_COOKIE)): raise HTTPException(status_code=401, detail="未登录中控") @app.get("/api/instance/open-url") def api_instance_open_url( request: Request, exchange_id: str, next: str = "/", embed: str = "" ): """已登录中控时生成实例 SSO 打开链接(2h 有效、单次使用,复用 HUB_BRIDGE_TOKEN)。""" _require_hub_logged_in(request) if not HUB_BRIDGE_TOKEN: raise HTTPException(status_code=503, detail="未配置 HUB_BRIDGE_TOKEN,无法签发实例打开链接") ex = _find_exchange(exchange_id) if not ex: raise HTTPException(status_code=404, detail="未知交易所 id") base = browser_url((ex.get("flask_url") or "").strip()).rstrip("/") if not base: raise HTTPException(status_code=400, detail="该账户未配置 flask_url") ex_key = (ex.get("key") or "").strip().lower() if not ex_key: raise HTTPException(status_code=400, detail="该账户缺少 key(用于 SSO 校验)") nxt = safe_next_path(next) token = mint_hub_sso_token(ex_key, nxt) if not token: raise HTTPException(status_code=503, detail="签发 SSO 失败") params = {"token": token, "next": nxt} if (embed or "").strip().lower() in ("1", "true", "yes", "on"): params["embed"] = "1" q = urlencode(params) return { "ok": True, "url": f"{base}/hub-sso?{q}", "expires_in": HUB_SSO_TTL_SEC, "exchange_id": exchange_id, "exchange_key": ex_key, } class CloseAllBody(BaseModel): exclude_ids: list[str] = Field(default_factory=list) class ClosePositionBody(BaseModel): symbol: str side: str class CancelOrderBody(BaseModel): symbol: str order_id: str channel: str = "regular" class CancelSymbolOrdersBody(BaseModel): symbol: str scope: str = "all" class PlaceTpslBody(BaseModel): symbol: str side: str stop_loss: float take_profit: float contracts: float | None = None @app.post("/api/orders/{exchange_id}/cancel") async def api_cancel_order(exchange_id: str, body: CancelOrderBody): ex = _find_exchange(exchange_id) if not ex or not ex.get("enabled"): raise HTTPException(status_code=404, detail="账户未启用") url = f"{ex['agent_url'].rstrip('/')}/orders/cancel" async with httpx.AsyncClient() as client: r = await client.post( url, headers=_agent_headers(), json={ "symbol": body.symbol, "order_id": body.order_id, "channel": body.channel or "regular", }, timeout=60.0, ) try: payload = r.json() except Exception: payload = {"raw": (r.text or "")[:2000]} return { "exchange": ex, "status_code": r.status_code, "payload": payload, "ok": bool(isinstance(payload, dict) and payload.get("ok")), } @app.post("/api/orders/{exchange_id}/cancel-symbol") async def api_cancel_symbol_orders(exchange_id: str, body: CancelSymbolOrdersBody): ex = _find_exchange(exchange_id) if not ex or not ex.get("enabled"): raise HTTPException(status_code=404, detail="账户未启用") url = f"{ex['agent_url'].rstrip('/')}/orders/cancel-symbol" async with httpx.AsyncClient() as client: r = await client.post( url, headers=_agent_headers(), json={"symbol": body.symbol, "scope": body.scope or "all"}, timeout=120.0, ) try: payload = r.json() except Exception: payload = {"raw": (r.text or "")[:2000]} return { "exchange": ex, "status_code": r.status_code, "payload": payload, "ok": bool(isinstance(payload, dict) and payload.get("ok")), } @app.post("/api/close/{exchange_id}/position") async def api_close_position(exchange_id: str, body: ClosePositionBody): ex = _find_exchange(exchange_id) if not ex or not ex.get("enabled"): raise HTTPException(status_code=404, detail="账户未启用") sym = (body.symbol or "").strip() side = (body.side or "").strip().lower() if not sym: raise HTTPException(status_code=400, detail="symbol 不能为空") if side not in ("long", "short"): raise HTTPException(status_code=400, detail="side 须为 long 或 short") url = f"{ex['agent_url'].rstrip('/')}/emergency/close-position" async with httpx.AsyncClient() as client: r = await client.post( url, headers=_agent_headers(), json={"symbol": sym, "side": side}, timeout=120.0, ) try: payload = r.json() except Exception: payload = {"raw": (r.text or "")[:2000]} return { "exchange": ex, "status_code": r.status_code, "payload": payload, "ok": bool(isinstance(payload, dict) and payload.get("ok")), } @app.post("/api/orders/{exchange_id}/place-tpsl") async def api_place_tpsl(exchange_id: str, body: PlaceTpslBody): ex = _find_exchange(exchange_id) if not ex or not ex.get("enabled"): raise HTTPException(status_code=404, detail="账户未启用") url = f"{ex['agent_url'].rstrip('/')}/orders/place-tpsl" async with httpx.AsyncClient() as client: r = await client.post( url, headers=_agent_headers(), json={ "symbol": body.symbol, "side": body.side, "stop_loss": body.stop_loss, "take_profit": body.take_profit, "contracts": body.contracts, }, timeout=120.0, ) try: payload = r.json() except Exception: payload = {"raw": (r.text or "")[:2000]} return { "exchange": ex, "status_code": r.status_code, "payload": payload, "ok": bool(isinstance(payload, dict) and payload.get("ok")), } @app.post("/api/close/{exchange_id}") async def api_close_exchange(exchange_id: str): ex = _find_exchange(exchange_id) if not ex or not ex.get("enabled"): raise HTTPException(status_code=404, detail="账户未启用") url = f"{ex['agent_url'].rstrip('/')}/emergency/close-all" async with httpx.AsyncClient() as client: r = await client.post(url, headers=_agent_headers(), timeout=120.0) try: body = r.json() except Exception: body = {"raw": (r.text or "")[:2000]} return {"exchange": ex, "status_code": r.status_code, "payload": body} @app.post("/api/close-all") async def api_close_all(body: CloseAllBody | None = Body(default=None)): excl = set(body.exclude_ids if body else []) excl |= env_force_disabled_ids() targets = [x for x in enabled_exchanges() if str(x["id"]) not in excl] async with httpx.AsyncClient() as client: async def one(ex: dict): url = f"{ex['agent_url'].rstrip('/')}/emergency/close-all" try: r = await client.post(url, headers=_agent_headers(), timeout=120.0) try: payload = r.json() except Exception: payload = {"raw": (r.text or "")[:2000]} return {"id": ex["id"], "name": ex["name"], "status_code": r.status_code, "payload": payload} except Exception as e: return {"id": ex["id"], "name": ex["name"], "status_code": None, "error": str(e)} results = await asyncio.gather(*[one(ex) for ex in targets]) return {"results": list(results)} def _trade_removed_response(): """旧版前端或缓存页面仍会请求 /api/trade/*,勿解析表单,直接返回说明。""" return JSONResponse( { "ok": False, "result": { "ok": False, "messages": [ "中控已移除下单区。请在监控卡片点击「实例」," "进入对应 crypto_monitor_* 网页添加关键位或下单。" ], }, "deprecated": True, }, status_code=410, ) @app.get("/api/ping") def api_ping(): return { "ok": True, "service": "manual-trading-hub", "build": HUB_BUILD, "trade_ui": False, "features": ["monitor", "settings", "auth"], "password_required": password_required(), "env_disabled_ids": sorted(env_force_disabled_ids()), "hub_disabled_ids_raw": (os.getenv("HUB_DISABLED_IDS") or ""), } @app.post("/api/trade/order/{exchange_id}") @app.post("/api/trade/key/{exchange_id}") @app.post("/api/trade/trend/preview/{exchange_id}") @app.post("/api/trade/trend/execute/{exchange_id}") async def api_trade_removed(exchange_id: str): return _trade_removed_response() @app.get("/api/trade/meta/{exchange_id}") @app.get("/api/trade/trend/preview/{exchange_id}/{preview_id}") async def api_trade_removed_get(exchange_id: str, preview_id: str = ""): return _trade_removed_response() def main(): import uvicorn print( f"manual-trading-hub start build={HUB_BUILD} listen={HUB_HOST}:{HUB_PORT}", flush=True, ) uvicorn.run(app, host=HUB_HOST, port=HUB_PORT, log_level="info", access_log=False) if __name__ == "__main__": main()