""" 多账户交易中控:监控区 / 下单区 / 系统设置。 转发至各 crypto_monitor_* 的 /api/hub/* 与子代理 /status。 """ from __future__ import annotations import asyncio import os from pathlib import Path import httpx from fastapi import Body, FastAPI, HTTPException, Query, Request from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field from settings_store import ( enabled_exchanges, env_force_disabled_ids, load_settings, save_settings, ) HUB_HOST = os.getenv("HUB_HOST", "0.0.0.0") HUB_PORT = int(os.getenv("HUB_PORT", "5100")) HUB_BRIDGE_TOKEN = (os.getenv("HUB_BRIDGE_TOKEN") or os.getenv("CONTROL_TOKEN") or "").strip() _trust_raw = (os.getenv("HUB_TRUST_LAN", "true") or "").strip().lower() HUB_TRUST_LAN = _trust_raw not in ("0", "false", "no", "off") DIR = Path(__file__).resolve().parent def _is_local(host: str | None) -> bool: if not host: return False h = host.lower() return h in ("127.0.0.1", "::1", "localhost") or h.startswith("::ffff:127.0.0.1") def _ipv4_rfc1918_private(host: str) -> bool: h = host.lower() if h.startswith("::ffff:"): h = h[7:] parts = h.split(".") if len(parts) != 4: return False try: a, b, c, d = (int(x) for x in parts) except ValueError: return False if any(x < 0 or x > 255 for x in (a, b, c, d)): return False if a == 10: return True if a == 172 and 16 <= b <= 31: return True if a == 192 and b == 168: return True return False def _client_allowed(host: str | None) -> bool: if _is_local(host): return True if HUB_TRUST_LAN and host and _ipv4_rfc1918_private(host): return True return False def _hub_headers() -> dict[str, str]: if not HUB_BRIDGE_TOKEN: return {} return {"X-Hub-Token": HUB_BRIDGE_TOKEN} def _agent_headers() -> dict[str, str]: if not HUB_BRIDGE_TOKEN: return {} return {"X-Control-Token": HUB_BRIDGE_TOKEN} def _find_exchange(ex_id: str) -> dict | None: for ex in load_settings().get("exchanges") or []: if str(ex.get("id")) == str(ex_id): return ex return None app = FastAPI(title="hub", docs_url=None, redoc_url=None) STATIC_DIR = DIR / "static" if STATIC_DIR.is_dir(): app.mount("/assets", StaticFiles(directory=str(STATIC_DIR)), name="assets") @app.middleware("http") async def local_only(request: Request, call_next): if request.client and not _client_allowed(request.client.host): return JSONResponse({"detail": "forbidden"}, status_code=403) return await call_next(request) def _shell_page(): index = STATIC_DIR / "index.html" if not index.is_file(): return JSONResponse({"detail": "missing static/index.html"}, status_code=500) return FileResponse(index) @app.get("/") def root_redirect(): from fastapi.responses import RedirectResponse return RedirectResponse("/monitor") @app.get("/monitor") @app.get("/trade") @app.get("/settings") def shell_pages(): return _shell_page() @app.get("/api/settings") def api_get_settings(): return load_settings() class SettingsBody(BaseModel): exchanges: list[dict] = Field(default_factory=list) @app.post("/api/settings") def api_save_settings(body: SettingsBody): data = {"version": 1, "exchanges": body.exchanges} save_settings(data) return {"ok": True} @app.get("/api/settings/meta") def api_settings_meta(): return { "env_disabled_ids": sorted(env_force_disabled_ids()), "hub_bridge_token_set": bool(HUB_BRIDGE_TOKEN), "capability_options": ["order", "key", "trend"], } async def _fetch_agent_status(client: httpx.AsyncClient, ex: dict) -> dict: url = f"{ex['agent_url'].rstrip('/')}/status" try: r = await client.get(url, headers=_agent_headers(), timeout=12.0) body = r.json() if r.content else {} return { "id": ex["id"], "name": ex["name"], "key": ex.get("key"), "agent_url": ex["agent_url"], "flask_url": ex.get("flask_url"), "capabilities": ex.get("capabilities") or [], "http_ok": r.status_code == 200, "agent": body, "error": body.get("error") if isinstance(body, dict) else None, } except Exception as e: return { "id": ex["id"], "name": ex["name"], "key": ex.get("key"), "agent_url": ex["agent_url"], "flask_url": ex.get("flask_url"), "capabilities": ex.get("capabilities") or [], "http_ok": False, "error": str(e), "agent": None, } async def _fetch_flask_json( client: httpx.AsyncClient, ex: dict, path: str, method: str = "GET", data=None ) -> dict | None: base = (ex.get("flask_url") or "").rstrip("/") if not base: return None try: if method == "GET": r = await client.get(f"{base}{path}", headers=_hub_headers(), timeout=15.0) else: r = await client.post(f"{base}{path}", headers=_hub_headers(), data=data, timeout=120.0) if r.status_code >= 400: return {"ok": False, "status": r.status_code, "text": (r.text or "")[:500]} return r.json() except Exception as e: return {"ok": False, "error": str(e)} @app.get("/api/monitor/board") async def api_monitor_board(): exchanges = enabled_exchanges() async with httpx.AsyncClient() as client: agent_rows = await asyncio.gather(*[_fetch_agent_status(client, ex) for ex in exchanges]) out = [] for ex, agent_row in zip(exchanges, agent_rows): hub_mon = await _fetch_flask_json(client, ex, "/api/hub/monitor") meta = await _fetch_flask_json(client, ex, "/api/hub/meta") key_prices = None if "key" in (ex.get("capabilities") or []): snap = await _fetch_flask_json(client, ex, "/api/price_snapshot") if isinstance(snap, dict): key_prices = snap.get("key_prices") flask_ok = isinstance(hub_mon, dict) and hub_mon.get("ok") is not False flask_err = None if isinstance(hub_mon, dict) and hub_mon.get("ok") is False: flask_err = ( hub_mon.get("msg") or hub_mon.get("error") or (str(hub_mon.get("text") or "")[:200] or None) ) out.append( { **agent_row, "review_url": ex.get("review_url") or "", "hub_monitor": hub_mon, "flask_ok": flask_ok, "flask_error": flask_err, "meta": (meta or {}).get("meta") if isinstance(meta, dict) else meta, "key_prices": key_prices, } ) return {"rows": out, "updated_at": __import__("datetime").datetime.now().isoformat(timespec="seconds")} class CloseAllBody(BaseModel): exclude_ids: list[str] = Field(default_factory=list) @app.post("/api/close/{exchange_id}") async def api_close_exchange(exchange_id: str): ex = _find_exchange(exchange_id) if not ex or not ex.get("enabled"): raise HTTPException(status_code=404, detail="账户未启用") url = f"{ex['agent_url'].rstrip('/')}/emergency/close-all" async with httpx.AsyncClient() as client: r = await client.post(url, headers=_agent_headers(), timeout=120.0) try: body = r.json() except Exception: body = {"raw": (r.text or "")[:2000]} return {"exchange": ex, "status_code": r.status_code, "payload": body} @app.post("/api/close-all") async def api_close_all(body: CloseAllBody | None = Body(default=None)): excl = set(body.exclude_ids if body else []) excl |= env_force_disabled_ids() targets = [x for x in enabled_exchanges() if str(x["id"]) not in excl] async with httpx.AsyncClient() as client: async def one(ex: dict): url = f"{ex['agent_url'].rstrip('/')}/emergency/close-all" try: r = await client.post(url, headers=_agent_headers(), timeout=120.0) try: payload = r.json() except Exception: payload = {"raw": (r.text or "")[:2000]} return {"id": ex["id"], "name": ex["name"], "status_code": r.status_code, "payload": payload} except Exception as e: return {"id": ex["id"], "name": ex["name"], "status_code": None, "error": str(e)} results = await asyncio.gather(*[one(ex) for ex in targets]) return {"results": list(results)} @app.get("/api/trade/meta/{exchange_id}") async def api_trade_meta(exchange_id: str): ex = _find_exchange(exchange_id) if not ex or not ex.get("enabled"): raise HTTPException(status_code=404, detail="账户未启用") async with httpx.AsyncClient() as client: meta = await _fetch_flask_json(client, ex, "/api/hub/meta") return {"exchange": ex, "meta": meta} @app.post("/api/trade/order/{exchange_id}") async def api_trade_order(exchange_id: str, request: Request): ex = _find_exchange(exchange_id) if not ex or not ex.get("enabled"): raise HTTPException(status_code=404, detail="账户未启用") form = await request.form() async with httpx.AsyncClient() as client: result = await _fetch_flask_json(client, ex, "/api/hub/add_order", "POST", dict(form)) return {"exchange": ex, "result": result} @app.post("/api/trade/key/{exchange_id}") async def api_trade_key(exchange_id: str, request: Request): ex = _find_exchange(exchange_id) if not ex or not ex.get("enabled"): raise HTTPException(status_code=404, detail="账户未启用") if "key" not in (ex.get("capabilities") or []): raise HTTPException(status_code=400, detail="该账户不支持关键位") form = await request.form() async with httpx.AsyncClient() as client: result = await _fetch_flask_json(client, ex, "/api/hub/add_key", "POST", dict(form)) return {"exchange": ex, "result": result} @app.post("/api/trade/trend/preview/{exchange_id}") async def api_trade_trend_preview(exchange_id: str, request: Request): ex = _find_exchange(exchange_id) if not ex or not ex.get("enabled"): raise HTTPException(status_code=404, detail="账户未启用") if "trend" not in (ex.get("capabilities") or []): raise HTTPException(status_code=400, detail="该账户不支持趋势回调") form = await request.form() async with httpx.AsyncClient() as client: result = await _fetch_flask_json(client, ex, "/api/hub/trend/preview", "POST", dict(form)) return {"exchange": ex, "result": result} @app.post("/api/trade/trend/execute/{exchange_id}") async def api_trade_trend_execute(exchange_id: str, request: Request): ex = _find_exchange(exchange_id) if not ex or not ex.get("enabled"): raise HTTPException(status_code=404, detail="账户未启用") form = await request.form() async with httpx.AsyncClient() as client: result = await _fetch_flask_json(client, ex, "/api/hub/trend/execute", "POST", dict(form)) return {"exchange": ex, "result": result} @app.get("/api/trade/trend/preview/{exchange_id}/{preview_id}") async def api_trade_trend_preview_get(exchange_id: str, preview_id: str): ex = _find_exchange(exchange_id) if not ex or not ex.get("enabled"): raise HTTPException(status_code=404, detail="账户未启用") async with httpx.AsyncClient() as client: result = await _fetch_flask_json(client, ex, f"/api/hub/trend/preview/{preview_id}") return {"exchange": ex, "result": result} def main(): import uvicorn uvicorn.run(app, host=HUB_HOST, port=HUB_PORT, log_level="warning", access_log=False) if __name__ == "__main__": main()