Files
crypto_monitor/manual_trading_hub/hub.py
T
2026-05-24 08:44:16 +08:00

620 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
多账户交易中控:监控区 / 系统设置。
聚合各实例监控数据与子代理 /status;下单请在各实例网页操作。
"""
from __future__ import annotations
import asyncio
import os
from pathlib import Path
from env_load import load_hub_dotenv
load_hub_dotenv()
import httpx
from fastapi import Body, FastAPI, HTTPException, Request
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
from settings_store import (
enabled_exchanges,
env_force_disabled_ids,
load_settings,
save_settings,
)
from hub_web_auth import (
SESSION_COOKIE,
SESSION_MAX_AGE_SEC,
cookie_secure_for_request,
create_session_token,
is_public_path,
password_required,
validate_session_token,
expected_username,
verify_credentials,
)
from url_public import browser_url, default_review_url, public_origin
HUB_HOST = os.getenv("HUB_HOST", "0.0.0.0")
HUB_PORT = int(os.getenv("HUB_PORT", "5100"))
HUB_BRIDGE_TOKEN = (os.getenv("HUB_BRIDGE_TOKEN") or os.getenv("CONTROL_TOKEN") or "").strip()
_trust_raw = (os.getenv("HUB_TRUST_LAN", "true") or "").strip().lower()
HUB_TRUST_LAN = _trust_raw not in ("0", "false", "no", "off")
DIR = Path(__file__).resolve().parent
HUB_BUILD = "20260525-fs-fix"
def _is_local(host: str | None) -> bool:
if not host:
return False
h = host.lower()
return h in ("127.0.0.1", "::1", "localhost") or h.startswith("::ffff:127.0.0.1")
def _ipv4_rfc1918_private(host: str) -> bool:
h = host.lower()
if h.startswith("::ffff:"):
h = h[7:]
parts = h.split(".")
if len(parts) != 4:
return False
try:
a, b, c, d = (int(x) for x in parts)
except ValueError:
return False
if any(x < 0 or x > 255 for x in (a, b, c, d)):
return False
if a == 10:
return True
if a == 172 and 16 <= b <= 31:
return True
if a == 192 and b == 168:
return True
return False
def _client_allowed(host: str | None) -> bool:
if _is_local(host):
return True
if HUB_TRUST_LAN and host and _ipv4_rfc1918_private(host):
return True
return False
def _hub_headers() -> dict[str, str]:
if not HUB_BRIDGE_TOKEN:
return {}
return {"X-Hub-Token": HUB_BRIDGE_TOKEN}
def _agent_headers() -> dict[str, str]:
if not HUB_BRIDGE_TOKEN:
return {}
return {"X-Control-Token": HUB_BRIDGE_TOKEN}
def _find_exchange(ex_id: str) -> dict | None:
for ex in load_settings().get("exchanges") or []:
if str(ex.get("id")) == str(ex_id):
return ex
return None
app = FastAPI(title="复盘系统中控", docs_url=None, redoc_url=None)
STATIC_DIR = DIR / "static"
if STATIC_DIR.is_dir():
app.mount("/assets", StaticFiles(directory=str(STATIC_DIR)), name="assets")
@app.middleware("http")
async def local_only(request: Request, call_next):
if request.client and not _client_allowed(request.client.host):
return JSONResponse({"detail": "forbidden"}, status_code=403)
return await call_next(request)
@app.middleware("http")
async def hub_password_gate(request: Request, call_next):
if not password_required():
return await call_next(request)
path = request.url.path
if is_public_path(path, request.method):
return await call_next(request)
token = request.cookies.get(SESSION_COOKIE)
if validate_session_token(token):
return await call_next(request)
if path.startswith("/api/"):
return JSONResponse({"detail": "未登录", "login_required": True}, status_code=401)
from fastapi.responses import RedirectResponse
nxt = path if path.startswith("/") else "/monitor"
return RedirectResponse(f"/login?next={nxt}", status_code=302)
def _shell_page():
index = STATIC_DIR / "index.html"
if not index.is_file():
return JSONResponse({"detail": "missing static/index.html"}, status_code=500)
return FileResponse(index)
def _login_page():
login = STATIC_DIR / "login.html"
if not login.is_file():
return JSONResponse({"detail": "missing static/login.html"}, status_code=500)
return FileResponse(login)
class LoginBody(BaseModel):
username: str = ""
password: str = ""
@app.get("/api/auth/status")
def api_auth_status(request: Request):
required = password_required()
logged_in = not required or validate_session_token(request.cookies.get(SESSION_COOKIE))
return {
"required": required,
"logged_in": logged_in,
"username_hint": expected_username() if required else None,
}
@app.post("/api/auth/login")
def api_auth_login(body: LoginBody, request: Request):
if not password_required():
return {"ok": True, "auth_disabled": True}
if not verify_credentials(body.username, body.password):
raise HTTPException(status_code=401, detail="用户名或密码错误")
token = create_session_token(body.username)
secure = cookie_secure_for_request(request)
resp = JSONResponse({"ok": True})
resp.set_cookie(
SESSION_COOKIE,
token,
httponly=True,
samesite="lax",
path="/",
max_age=SESSION_MAX_AGE_SEC,
secure=secure,
)
return resp
@app.post("/api/auth/logout")
def api_auth_logout(request: Request):
secure = cookie_secure_for_request(request)
resp = JSONResponse({"ok": True})
resp.delete_cookie(SESSION_COOKIE, path="/", secure=secure)
return resp
@app.get("/login")
def login_page():
return _login_page()
@app.get("/")
def root_redirect():
from fastapi.responses import RedirectResponse
return RedirectResponse("/monitor")
@app.get("/monitor")
@app.get("/settings")
def shell_pages():
return _shell_page()
@app.get("/trade")
def trade_removed_redirect():
from fastapi.responses import RedirectResponse
return RedirectResponse("/monitor", status_code=302)
@app.get("/api/settings")
def api_get_settings():
return load_settings()
class SettingsBody(BaseModel):
exchanges: list[dict] = Field(default_factory=list)
@app.post("/api/settings")
def api_save_settings(body: SettingsBody):
force_off = env_force_disabled_ids()
to_save = []
for ex in body.exchanges:
row = dict(ex)
eid = str(row.get("id", "")).strip()
if eid in force_off:
row["enabled"] = False
row.pop("env_disabled", None)
to_save.append(row)
save_settings({"version": 1, "exchanges": to_save})
return {"ok": True, "settings": load_settings()}
@app.get("/api/settings/meta")
def api_settings_meta():
po = public_origin()
return {
"env_disabled_ids": sorted(env_force_disabled_ids()),
"hub_bridge_token_set": bool(HUB_BRIDGE_TOKEN),
"capability_options": ["key", "trend"],
"public_origin": f"{po[0]}://{po[1]}" if po else None,
"public_origin_hint": (
"未设置 HUB_PUBLIC_ORIGIN 时,复盘链接若为 127.0.0.1,仅服务器本机浏览器可打开"
if not po
else "复盘/展示链接已替换为对外地址"
),
"password_required": password_required(),
}
async def _fetch_agent_status(client: httpx.AsyncClient, ex: dict) -> dict:
url = f"{ex['agent_url'].rstrip('/')}/status"
try:
r = await client.get(url, headers=_agent_headers(), timeout=12.0)
body = r.json() if r.content else {}
return {
"id": ex["id"],
"name": ex["name"],
"key": ex.get("key"),
"agent_url": ex["agent_url"],
"flask_url": ex.get("flask_url"),
"capabilities": ex.get("capabilities") or [],
"http_ok": r.status_code == 200,
"agent": body,
"error": body.get("error") if isinstance(body, dict) else None,
}
except Exception as e:
return {
"id": ex["id"],
"name": ex["name"],
"key": ex.get("key"),
"agent_url": ex["agent_url"],
"flask_url": ex.get("flask_url"),
"capabilities": ex.get("capabilities") or [],
"http_ok": False,
"error": str(e),
"agent": None,
}
def _parse_http_json_body(r: httpx.Response) -> dict:
text = (r.text or "").strip()
if not text:
return {"ok": False, "status": r.status_code, "text": "(empty body)"}
try:
data = r.json()
if isinstance(data, dict):
return data
return {"ok": False, "status": r.status_code, "text": text[:500]}
except Exception:
snippet = text[:500]
if snippet.lstrip().lower().startswith("<!") or "internal server error" in snippet.lower():
return {
"ok": False,
"status": r.status_code,
"messages": [f"实例返回 HTML 错误(HTTP {r.status_code}),请查看该 Flask 日志"],
"text": snippet,
}
return {"ok": False, "status": r.status_code, "messages": [snippet], "text": snippet}
async def _fetch_flask_json(
client: httpx.AsyncClient, ex: dict, path: str, method: str = "GET", data=None
) -> dict | None:
base = (ex.get("flask_url") or "").rstrip("/")
if not base:
return None
try:
if method == "GET":
r = await client.get(f"{base}{path}", headers=_hub_headers(), timeout=15.0)
else:
r = await client.post(f"{base}{path}", headers=_hub_headers(), data=data, timeout=120.0)
if r.status_code >= 400:
parsed = _parse_http_json_body(r)
parsed.setdefault("ok", False)
parsed.setdefault("status", r.status_code)
return parsed
return _parse_http_json_body(r)
except Exception as e:
return {"ok": False, "error": str(e)}
@app.get("/api/monitor/board")
async def api_monitor_board():
exchanges = enabled_exchanges()
async with httpx.AsyncClient() as client:
agent_rows = await asyncio.gather(*[_fetch_agent_status(client, ex) for ex in exchanges])
out = []
for ex, agent_row in zip(exchanges, agent_rows):
hub_mon = await _fetch_flask_json(client, ex, "/api/hub/monitor")
meta = await _fetch_flask_json(client, ex, "/api/hub/meta")
key_prices = None
if "key" in (ex.get("capabilities") or []):
snap = await _fetch_flask_json(client, ex, "/api/price_snapshot")
if isinstance(snap, dict):
key_prices = snap.get("key_prices")
flask_ok = isinstance(hub_mon, dict) and hub_mon.get("ok") is not False
flask_err = None
if isinstance(hub_mon, dict) and hub_mon.get("ok") is False:
st = hub_mon.get("status")
if st == 404:
flask_err = (
"HTTP 404:该 Flask 未注册 /api/hub/*hub_bridge 未加载)。"
"请在仓库根目录 git pull 后 pm2 restart crypto_binance crypto_gate crypto_gate_bot"
"并查看启动日志是否含 [hub_bridge] ImportError"
)
else:
flask_err = (
hub_mon.get("msg")
or hub_mon.get("error")
or (f"HTTP {st}" if st else None)
or (str(hub_mon.get("text") or "")[:120] or None)
)
raw_review = (ex.get("review_url") or "").strip()
review_link = browser_url(raw_review) if raw_review else default_review_url(
ex.get("flask_url")
)
out.append(
{
**agent_row,
"flask_url": ex.get("flask_url") or "",
"flask_url_browser": browser_url(ex.get("flask_url")),
"review_url": review_link,
"hub_monitor": hub_mon,
"flask_ok": flask_ok,
"flask_error": flask_err,
"meta": (meta or {}).get("meta") if isinstance(meta, dict) else meta,
"key_prices": key_prices,
}
)
return {"rows": out, "updated_at": __import__("datetime").datetime.now().isoformat(timespec="seconds")}
class CloseAllBody(BaseModel):
exclude_ids: list[str] = Field(default_factory=list)
class ClosePositionBody(BaseModel):
symbol: str
side: str
class CancelOrderBody(BaseModel):
symbol: str
order_id: str
channel: str = "regular"
class CancelSymbolOrdersBody(BaseModel):
symbol: str
scope: str = "all"
class PlaceTpslBody(BaseModel):
symbol: str
side: str
stop_loss: float
take_profit: float
contracts: float | None = None
@app.post("/api/orders/{exchange_id}/cancel")
async def api_cancel_order(exchange_id: str, body: CancelOrderBody):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
url = f"{ex['agent_url'].rstrip('/')}/orders/cancel"
async with httpx.AsyncClient() as client:
r = await client.post(
url,
headers=_agent_headers(),
json={
"symbol": body.symbol,
"order_id": body.order_id,
"channel": body.channel or "regular",
},
timeout=60.0,
)
try:
payload = r.json()
except Exception:
payload = {"raw": (r.text or "")[:2000]}
return {
"exchange": ex,
"status_code": r.status_code,
"payload": payload,
"ok": bool(isinstance(payload, dict) and payload.get("ok")),
}
@app.post("/api/orders/{exchange_id}/cancel-symbol")
async def api_cancel_symbol_orders(exchange_id: str, body: CancelSymbolOrdersBody):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
url = f"{ex['agent_url'].rstrip('/')}/orders/cancel-symbol"
async with httpx.AsyncClient() as client:
r = await client.post(
url,
headers=_agent_headers(),
json={"symbol": body.symbol, "scope": body.scope or "all"},
timeout=120.0,
)
try:
payload = r.json()
except Exception:
payload = {"raw": (r.text or "")[:2000]}
return {
"exchange": ex,
"status_code": r.status_code,
"payload": payload,
"ok": bool(isinstance(payload, dict) and payload.get("ok")),
}
@app.post("/api/close/{exchange_id}/position")
async def api_close_position(exchange_id: str, body: ClosePositionBody):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
sym = (body.symbol or "").strip()
side = (body.side or "").strip().lower()
if not sym:
raise HTTPException(status_code=400, detail="symbol 不能为空")
if side not in ("long", "short"):
raise HTTPException(status_code=400, detail="side 须为 long 或 short")
url = f"{ex['agent_url'].rstrip('/')}/emergency/close-position"
async with httpx.AsyncClient() as client:
r = await client.post(
url,
headers=_agent_headers(),
json={"symbol": sym, "side": side},
timeout=120.0,
)
try:
payload = r.json()
except Exception:
payload = {"raw": (r.text or "")[:2000]}
return {
"exchange": ex,
"status_code": r.status_code,
"payload": payload,
"ok": bool(isinstance(payload, dict) and payload.get("ok")),
}
@app.post("/api/orders/{exchange_id}/place-tpsl")
async def api_place_tpsl(exchange_id: str, body: PlaceTpslBody):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
url = f"{ex['agent_url'].rstrip('/')}/orders/place-tpsl"
async with httpx.AsyncClient() as client:
r = await client.post(
url,
headers=_agent_headers(),
json={
"symbol": body.symbol,
"side": body.side,
"stop_loss": body.stop_loss,
"take_profit": body.take_profit,
"contracts": body.contracts,
},
timeout=120.0,
)
try:
payload = r.json()
except Exception:
payload = {"raw": (r.text or "")[:2000]}
return {
"exchange": ex,
"status_code": r.status_code,
"payload": payload,
"ok": bool(isinstance(payload, dict) and payload.get("ok")),
}
@app.post("/api/close/{exchange_id}")
async def api_close_exchange(exchange_id: str):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
url = f"{ex['agent_url'].rstrip('/')}/emergency/close-all"
async with httpx.AsyncClient() as client:
r = await client.post(url, headers=_agent_headers(), timeout=120.0)
try:
body = r.json()
except Exception:
body = {"raw": (r.text or "")[:2000]}
return {"exchange": ex, "status_code": r.status_code, "payload": body}
@app.post("/api/close-all")
async def api_close_all(body: CloseAllBody | None = Body(default=None)):
excl = set(body.exclude_ids if body else [])
excl |= env_force_disabled_ids()
targets = [x for x in enabled_exchanges() if str(x["id"]) not in excl]
async with httpx.AsyncClient() as client:
async def one(ex: dict):
url = f"{ex['agent_url'].rstrip('/')}/emergency/close-all"
try:
r = await client.post(url, headers=_agent_headers(), timeout=120.0)
try:
payload = r.json()
except Exception:
payload = {"raw": (r.text or "")[:2000]}
return {"id": ex["id"], "name": ex["name"], "status_code": r.status_code, "payload": payload}
except Exception as e:
return {"id": ex["id"], "name": ex["name"], "status_code": None, "error": str(e)}
results = await asyncio.gather(*[one(ex) for ex in targets])
return {"results": list(results)}
def _trade_removed_response():
"""旧版前端或缓存页面仍会请求 /api/trade/*,勿解析表单,直接返回说明。"""
return JSONResponse(
{
"ok": False,
"result": {
"ok": False,
"messages": [
"中控已移除下单区。请在监控卡片点击「实例」,"
"进入对应 crypto_monitor_* 网页添加关键位或下单。"
],
},
"deprecated": True,
},
status_code=410,
)
@app.get("/api/ping")
def api_ping():
return {
"ok": True,
"service": "manual-trading-hub",
"build": HUB_BUILD,
"trade_ui": False,
"features": ["monitor", "settings", "auth"],
"password_required": password_required(),
"env_disabled_ids": sorted(env_force_disabled_ids()),
"hub_disabled_ids_raw": (os.getenv("HUB_DISABLED_IDS") or ""),
}
@app.post("/api/trade/order/{exchange_id}")
@app.post("/api/trade/key/{exchange_id}")
@app.post("/api/trade/trend/preview/{exchange_id}")
@app.post("/api/trade/trend/execute/{exchange_id}")
async def api_trade_removed(exchange_id: str):
return _trade_removed_response()
@app.get("/api/trade/meta/{exchange_id}")
@app.get("/api/trade/trend/preview/{exchange_id}/{preview_id}")
async def api_trade_removed_get(exchange_id: str, preview_id: str = ""):
return _trade_removed_response()
def main():
import uvicorn
uvicorn.run(app, host=HUB_HOST, port=HUB_PORT, log_level="warning", access_log=False)
if __name__ == "__main__":
main()