Files
crypto_monitor/manual_trading_hub/hub.py
T
dekun f9301b92b9 feat(hub): trend plan breakeven and stop from monitor fullscreen
Proxy /api/hub/trend/stop and breakeven to instances; enable offset input and actions in hub UI. Add horizontal padding on strategy records page.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-04 14:46:24 +08:00

1428 lines
47 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
多账户交易中控:监控区 / 系统设置。
聚合各实例监控数据与子代理 /status;下单请在各实例网页操作。
"""
from __future__ import annotations
import asyncio
import os
import sys
from contextlib import asynccontextmanager
from pathlib import Path
_REPO_ROOT = Path(__file__).resolve().parent.parent
if str(_REPO_ROOT) not in sys.path:
sys.path.insert(0, str(_REPO_ROOT))
from hub_kline_store import format_ohlcv_detail, resolve_chart_bars, retention_days
from hub_ohlcv_lib import CHART_TIMEFRAME_ORDER, CHART_TIMEFRAMES, bar_limit_for_timeframe
from env_load import load_hub_dotenv
load_hub_dotenv()
import httpx
from fastapi import Body, FastAPI, HTTPException, Request
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
from settings_store import (
enabled_exchanges,
env_force_disabled_ids,
load_settings,
save_settings,
)
from hub_web_auth import (
SESSION_COOKIE,
SESSION_MAX_AGE_SEC,
clear_session_cookie,
cookie_secure_for_request,
create_session_token,
embed_allowed,
embed_frame_ancestors,
is_public_path,
password_required,
set_session_cookie,
validate_session_token,
expected_username,
verify_credentials,
)
from hub_sso import HUB_SSO_TTL_SEC, mint_hub_sso_token, safe_next_path
from url_public import browser_url, default_review_url, public_origin
from urllib.parse import urlencode
from hub_board_cache import HUB_BOARD_POLL_INTERVAL, board_store
from hub_chart_cache import (
HUB_CHART_POLL_INTERVAL,
HUB_CHART_WATCH_TTL_SEC,
chart_poll_store,
parse_series_key,
)
try:
from exchange_orders import symbols_match as _symbols_match
except ImportError:
def _symbols_match(position_symbol: str, order_symbol: str) -> bool:
a = (position_symbol or "").strip().upper()
b = (order_symbol or "").strip().upper()
return bool(a and b and a == b)
HUB_HOST = os.getenv("HUB_HOST", "0.0.0.0")
HUB_PORT = int(os.getenv("HUB_PORT", "5100"))
HUB_BRIDGE_TOKEN = (os.getenv("HUB_BRIDGE_TOKEN") or os.getenv("CONTROL_TOKEN") or "").strip()
_trust_raw = (os.getenv("HUB_TRUST_LAN", "true") or "").strip().lower()
HUB_TRUST_LAN = _trust_raw not in ("0", "false", "no", "off")
_allow_pub_raw = (os.getenv("HUB_ALLOW_PUBLIC") or "").strip().lower()
# 云服务器 + 域名反代时设为 true:不做 IP 限制,仅靠 HUB_PASSWORD / 登录页保护
HUB_ALLOW_PUBLIC = _allow_pub_raw in ("1", "true", "yes", "on")
DIR = Path(__file__).resolve().parent
HUB_BUILD = "20260603-hub-board-sse"
HUB_AGENT_TIMEOUT = float(os.getenv("HUB_AGENT_TIMEOUT", "8"))
HUB_FLASK_TIMEOUT = float(os.getenv("HUB_FLASK_TIMEOUT", "10"))
HUB_BOARD_TIMEOUT = float(os.getenv("HUB_BOARD_TIMEOUT", "45"))
_board_key_prices_raw = (os.getenv("HUB_BOARD_KEY_PRICES", "true") or "").strip().lower()
HUB_BOARD_KEY_PRICES = _board_key_prices_raw in ("1", "true", "yes", "on")
def _is_local(host: str | None) -> bool:
if not host:
return False
h = host.lower()
return h in ("127.0.0.1", "::1", "localhost") or h.startswith("::ffff:127.0.0.1")
def _ipv4_rfc1918_private(host: str) -> bool:
h = host.lower()
if h.startswith("::ffff:"):
h = h[7:]
parts = h.split(".")
if len(parts) != 4:
return False
try:
a, b, c, d = (int(x) for x in parts)
except ValueError:
return False
if any(x < 0 or x > 255 for x in (a, b, c, d)):
return False
if a == 10:
return True
if a == 172 and 16 <= b <= 31:
return True
if a == 192 and b == 168:
return True
return False
def _client_allowed(host: str | None) -> bool:
if _is_local(host):
return True
if HUB_TRUST_LAN and host and _ipv4_rfc1918_private(host):
return True
return False
def _hub_headers() -> dict[str, str]:
if not HUB_BRIDGE_TOKEN:
return {}
return {"X-Hub-Token": HUB_BRIDGE_TOKEN}
def _agent_headers() -> dict[str, str]:
if not HUB_BRIDGE_TOKEN:
return {}
return {"X-Control-Token": HUB_BRIDGE_TOKEN}
def _find_exchange(ex_id: str) -> dict | None:
for ex in load_settings().get("exchanges") or []:
if str(ex.get("id")) == str(ex_id):
return ex
return None
async def _run_chart_poll() -> dict:
keys = chart_poll_store.active_series_keys()
if not keys:
return {"ok": True, "series_count": 0, "polled": 0}
polled = 0
errors: list[str] = []
for key in keys:
parsed = parse_series_key(key)
if not parsed:
continue
ex_k, sym, tf = parsed
ex = _find_exchange_by_key(ex_k)
if not ex or not ex.get("enabled"):
continue
ex_ref = ex
sym_ref = sym
tf_ref = tf
def remote_fetch(**kwargs) -> dict:
tf_use = kwargs.get("timeframe") or tf_ref
return _fetch_instance_ohlcv_sync(
ex_ref,
symbol=kwargs.get("symbol") or sym_ref,
timeframe=tf_use,
since_ms=kwargs.get("since_ms"),
limit=int(kwargs.get("limit") or bar_limit_for_timeframe(tf_use)),
)
try:
result = await asyncio.to_thread(
resolve_chart_bars,
ex_k,
sym,
tf,
remote_fetch,
force_refresh=False,
tail_refresh=True,
)
polled += 1
chart_poll_store.note_series_result(
ex_k,
sym,
tf,
ok=bool(result.get("ok")),
fetched=int(result.get("fetched") or 0),
error=None if result.get("ok") else str(result.get("msg") or "poll_failed"),
)
if not result.get("ok"):
errors.append(f"{key}:{result.get('msg')}")
except Exception as e:
chart_poll_store.note_series_result(ex_k, sym, tf, ok=False, error=str(e))
errors.append(f"{key}:{e}")
out: dict = {"ok": True, "series_count": len(keys), "polled": polled}
if errors:
out["errors"] = errors[:8]
return out
async def _run_board_aggregate() -> dict:
try:
body = await asyncio.wait_for(_build_monitor_board_payload(), timeout=HUB_BOARD_TIMEOUT)
return {"ok": True, **body}
except asyncio.TimeoutError:
return {
"ok": False,
"rows": [],
"error": "board_timeout",
"msg": (
f"监控聚合超过 {int(HUB_BOARD_TIMEOUT)} 秒。"
"请检查子代理/Flask,或设 HUB_BOARD_KEY_PRICES=false、缩短 HUB_FLASK_TIMEOUT"
),
"updated_at": __import__("datetime").datetime.now().isoformat(timespec="seconds"),
}
def _schedule_board_refresh() -> None:
board_store.request_refresh()
@asynccontextmanager
async def _hub_lifespan(_app: FastAPI):
await board_store.start(_run_board_aggregate)
await chart_poll_store.start(_run_chart_poll)
try:
yield
finally:
await chart_poll_store.stop()
await board_store.stop()
app = FastAPI(title="复盘系统中控", docs_url=None, redoc_url=None, lifespan=_hub_lifespan)
STATIC_DIR = DIR / "static"
if STATIC_DIR.is_dir():
app.mount("/assets", StaticFiles(directory=str(STATIC_DIR)), name="assets")
@app.middleware("http")
async def local_only(request: Request, call_next):
if HUB_ALLOW_PUBLIC:
return await call_next(request)
peer = request.client.host if request.client else None
if not _client_allowed(peer):
return JSONResponse({"detail": "forbidden"}, status_code=403)
return await call_next(request)
@app.middleware("http")
async def embed_frame_headers(request: Request, call_next):
response = await call_next(request)
if embed_allowed():
ancestors = embed_frame_ancestors()
if ancestors == "*":
response.headers["Content-Security-Policy"] = "frame-ancestors *"
else:
response.headers["Content-Security-Policy"] = f"frame-ancestors 'self' {ancestors}"
return response
@app.middleware("http")
async def hub_password_gate(request: Request, call_next):
if not password_required():
return await call_next(request)
path = request.url.path
if is_public_path(path, request.method):
return await call_next(request)
token = request.cookies.get(SESSION_COOKIE)
if validate_session_token(token):
return await call_next(request)
if path.startswith("/api/"):
return JSONResponse({"detail": "未登录", "login_required": True}, status_code=401)
from fastapi.responses import RedirectResponse
nxt = path if path.startswith("/") else "/monitor"
return RedirectResponse(f"/login?next={nxt}", status_code=302)
def _shell_page():
index = STATIC_DIR / "index.html"
if not index.is_file():
return JSONResponse({"detail": "missing static/index.html"}, status_code=500)
return FileResponse(index)
def _login_page():
login = STATIC_DIR / "login.html"
if not login.is_file():
return JSONResponse({"detail": "missing static/login.html"}, status_code=500)
return FileResponse(login)
class LoginBody(BaseModel):
username: str = ""
password: str = ""
@app.get("/api/auth/status")
def api_auth_status(request: Request):
required = password_required()
logged_in = not required or validate_session_token(request.cookies.get(SESSION_COOKIE))
return {
"required": required,
"logged_in": logged_in,
"username_hint": expected_username() if required else None,
}
@app.post("/api/auth/login")
def api_auth_login(body: LoginBody, request: Request):
if not password_required():
return {"ok": True, "auth_disabled": True}
if not verify_credentials(body.username, body.password):
raise HTTPException(status_code=401, detail="用户名或密码错误")
token = create_session_token(body.username)
embed = (request.headers.get("x-hub-embed") or "").strip() == "1"
resp = JSONResponse({"ok": True, "session_token": token, "embed": embed})
set_session_cookie(resp, request, token, embed=embed)
return resp
@app.get("/embed-auth")
def embed_auth_login(request: Request, token: str = "", next: str = "/monitor"):
"""
嵌入式打开:父页跨域 fetch 登录时 Cookie 可能写不进 iframe
用 session_token 在本页做一次导航,在 iframe 内写入 hub_sess。
"""
from fastapi.responses import RedirectResponse
dest = safe_next_path(next)
if not password_required():
return RedirectResponse(dest, status_code=302)
if not validate_session_token(token):
q = urlencode({"next": dest, "embed": "1"})
return RedirectResponse(f"/login?{q}", status_code=302)
resp = RedirectResponse(dest, status_code=302)
set_session_cookie(resp, request, token, embed=True)
return resp
@app.post("/api/auth/logout")
def api_auth_logout(request: Request):
embed = (request.headers.get("x-hub-embed") or "").strip() == "1"
resp = JSONResponse({"ok": True})
clear_session_cookie(resp, request, embed=embed)
return resp
@app.get("/login")
def login_page():
return _login_page()
@app.get("/")
def root_redirect():
from fastapi.responses import RedirectResponse
return RedirectResponse("/monitor")
@app.get("/monitor")
@app.get("/market")
@app.get("/settings")
def shell_pages():
return _shell_page()
@app.get("/trade")
def trade_removed_redirect():
from fastapi.responses import RedirectResponse
return RedirectResponse("/monitor", status_code=302)
@app.get("/api/settings")
def api_get_settings():
return load_settings()
class SettingsBody(BaseModel):
exchanges: list[dict] = Field(default_factory=list)
@app.post("/api/settings")
def api_save_settings(body: SettingsBody):
force_off = env_force_disabled_ids()
to_save = []
for ex in body.exchanges:
row = dict(ex)
eid = str(row.get("id", "")).strip()
if eid in force_off:
row["enabled"] = False
row.pop("env_disabled", None)
to_save.append(row)
save_settings({"version": 1, "exchanges": to_save})
return {"ok": True, "settings": load_settings()}
def _find_exchange_by_key(exchange_key: str) -> dict | None:
key = (exchange_key or "").strip().lower()
if not key:
return None
for ex in load_settings().get("exchanges") or []:
if str(ex.get("key") or "").strip().lower() == key:
return ex
if str(ex.get("id") or "").strip() == exchange_key.strip():
return ex
return None
def _fetch_instance_ohlcv_sync(
ex: dict,
*,
symbol: str,
timeframe: str,
since_ms: int | None,
limit: int,
) -> dict:
base = (ex.get("flask_url") or "").rstrip("/")
if not base:
return {"ok": False, "msg": "未配置 flask_url"}
params = {"symbol": symbol, "timeframe": timeframe, "limit": str(int(limit))}
if since_ms is not None and int(since_ms) > 0:
params["since_ms"] = str(int(since_ms))
url = f"{base}/api/hub/ohlcv?{urlencode(params)}"
try:
with httpx.Client(timeout=HUB_FLASK_TIMEOUT) as client:
r = client.get(url, headers=_hub_headers())
if r.status_code >= 400:
parsed = _parse_http_json_body(r)
parsed.setdefault("ok", False)
return parsed
data = r.json() if r.content else {}
return data if isinstance(data, dict) else {"ok": False, "msg": "无效 JSON"}
except Exception as e:
return {"ok": False, "msg": str(e)}
@app.get("/api/chart/meta")
def api_chart_meta():
tfs = [tf for tf in CHART_TIMEFRAME_ORDER if tf in CHART_TIMEFRAMES]
exchanges = []
for ex in enabled_exchanges(load_settings()):
exchanges.append(
{
"id": ex.get("id"),
"key": ex.get("key"),
"name": ex.get("name"),
}
)
return {
"ok": True,
"timeframes": [tf for tf in tfs if tf in CHART_TIMEFRAMES],
"retention_days": retention_days(),
"limits": {tf: bar_limit_for_timeframe(tf) for tf in tfs if tf in CHART_TIMEFRAMES},
"exchanges": exchanges,
}
@app.get("/api/chart/ohlcv")
def api_chart_ohlcv(
exchange_key: str = "",
symbol: str = "",
timeframe: str = "1d",
refresh: str = "",
):
ex = _find_exchange_by_key(exchange_key)
if not ex:
raise HTTPException(status_code=400, detail="交易所不存在")
if not ex.get("enabled"):
raise HTTPException(status_code=400, detail="该交易所未启用")
sym = (symbol or "").strip().upper()
if not sym:
raise HTTPException(status_code=400, detail="请输入币种")
ex_key = str(ex.get("key") or "").strip().lower()
force = (refresh or "").strip().lower() in ("1", "true", "yes", "on")
def remote_fetch(**kwargs):
return _fetch_instance_ohlcv_sync(
ex,
symbol=kwargs.get("symbol") or sym,
timeframe=kwargs.get("timeframe") or timeframe,
since_ms=kwargs.get("since_ms"),
limit=int(kwargs.get("limit") or bar_limit_for_timeframe(timeframe)),
)
result = resolve_chart_bars(
ex_key,
sym,
timeframe,
remote_fetch,
force_refresh=force,
)
if not result.get("ok"):
raise HTTPException(status_code=502, detail=result.get("msg") or "K线加载失败")
tick = result.get("price_tick")
last = result["candles"][-1] if result.get("candles") else None
result["ohlcv"] = format_ohlcv_detail(
{
"open": last.get("open") if last else None,
"high": last.get("high") if last else None,
"low": last.get("low") if last else None,
"close": last.get("close") if last else None,
"volume": last.get("volume") if last else None,
}
if last
else None,
tick,
)
result["chart_version"] = chart_poll_store.version
result["series_version"] = chart_poll_store.series_version(ex_key, sym, timeframe)
result["chart_poll_interval_sec"] = HUB_CHART_POLL_INTERVAL
return result
class ChartWatchBody(BaseModel):
exchange_key: str = ""
symbol: str = ""
timeframe: str = "5m"
@app.post("/api/chart/watch")
async def api_chart_watch(body: ChartWatchBody = Body(...)):
ex_k = (body.exchange_key or "").strip().lower()
sym = (body.symbol or "").strip().upper()
tf = (body.timeframe or "5m").strip()
if not ex_k or not sym:
raise HTTPException(status_code=400, detail="缺少 exchange_key 或 symbol")
if tf not in CHART_TIMEFRAMES:
raise HTTPException(status_code=400, detail="不支持的周期")
key = chart_poll_store.touch_watch(ex_k, sym, tf)
chart_poll_store.request_refresh()
return {
"ok": True,
"series_key": key,
"series_version": chart_poll_store.series_version(ex_k, sym, tf),
"chart_version": chart_poll_store.version,
"watch_ttl_sec": HUB_CHART_WATCH_TTL_SEC,
}
@app.post("/api/chart/unwatch")
async def api_chart_unwatch(body: ChartWatchBody = Body(...)):
chart_poll_store.clear_watch(body.exchange_key, body.symbol, body.timeframe)
return {"ok": True}
@app.get("/api/chart/stream")
async def api_chart_stream():
from fastapi.responses import StreamingResponse
return StreamingResponse(
chart_poll_store.iter_sse(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
@app.get("/api/chart/poll/meta")
async def api_chart_poll_meta():
return chart_poll_store.event_dict()
@app.get("/api/settings/meta")
def api_settings_meta():
po = public_origin()
return {
"env_disabled_ids": sorted(env_force_disabled_ids()),
"hub_bridge_token_set": bool(HUB_BRIDGE_TOKEN),
"capability_options": ["key", "trend"],
"public_origin": f"{po[0]}://{po[1]}" if po else None,
"public_origin_hint": (
"未设置 HUB_PUBLIC_ORIGIN 时,复盘链接若为 127.0.0.1,仅服务器本机浏览器可打开"
if not po
else "复盘/展示链接已替换为对外地址"
),
"password_required": password_required(),
}
async def _fetch_agent_status(client: httpx.AsyncClient, ex: dict) -> dict:
url = f"{ex['agent_url'].rstrip('/')}/status"
try:
r = await client.get(url, headers=_agent_headers(), timeout=HUB_AGENT_TIMEOUT)
body = r.json() if r.content else {}
return {
"id": ex["id"],
"name": ex["name"],
"key": ex.get("key"),
"agent_url": ex["agent_url"],
"flask_url": ex.get("flask_url"),
"capabilities": ex.get("capabilities") or [],
"http_ok": r.status_code == 200,
"agent": body,
"error": body.get("error") if isinstance(body, dict) else None,
}
except Exception as e:
return {
"id": ex["id"],
"name": ex["name"],
"key": ex.get("key"),
"agent_url": ex["agent_url"],
"flask_url": ex.get("flask_url"),
"capabilities": ex.get("capabilities") or [],
"http_ok": False,
"error": str(e),
"agent": None,
}
def _parse_http_json_body(r: httpx.Response) -> dict:
text = (r.text or "").strip()
if not text:
return {"ok": False, "status": r.status_code, "text": "(empty body)"}
try:
data = r.json()
if isinstance(data, dict):
return data
return {"ok": False, "status": r.status_code, "text": text[:500]}
except Exception:
snippet = text[:500]
if snippet.lstrip().lower().startswith("<!") or "internal server error" in snippet.lower():
return {
"ok": False,
"status": r.status_code,
"messages": [f"实例返回 HTML 错误(HTTP {r.status_code}),请查看该 Flask 日志"],
"text": snippet,
}
return {"ok": False, "status": r.status_code, "messages": [snippet], "text": snippet}
async def _fetch_flask_json(
client: httpx.AsyncClient,
ex: dict,
path: str,
method: str = "GET",
data=None,
json_body: dict | None = None,
) -> dict | None:
base = (ex.get("flask_url") or "").rstrip("/")
if not base:
return None
try:
if method == "GET":
r = await client.get(f"{base}{path}", headers=_hub_headers(), timeout=HUB_FLASK_TIMEOUT)
else:
headers = {**_hub_headers(), "Content-Type": "application/json"}
if json_body is not None:
r = await client.post(
f"{base}{path}", headers=headers, json=json_body, timeout=120.0
)
else:
r = await client.post(
f"{base}{path}", headers=headers, data=data, timeout=120.0
)
if r.status_code >= 400:
parsed = _parse_http_json_body(r)
parsed.setdefault("ok", False)
parsed.setdefault("status", r.status_code)
return parsed
return _parse_http_json_body(r)
except Exception as e:
return {"ok": False, "error": str(e)}
def _flask_error_from_hub_mon(hub_mon: dict | None) -> str | None:
if not isinstance(hub_mon, dict) or hub_mon.get("ok") is not False:
return None
st = hub_mon.get("status")
if st == 404:
return (
"HTTP 404:该 Flask 未注册 /api/hub/*hub_bridge 未加载)。"
"请在仓库根目录 git pull 后 pm2 restart crypto_binance crypto_gate crypto_gate_bot"
"并查看启动日志是否含 [hub_bridge] ImportError"
)
return (
hub_mon.get("msg")
or hub_mon.get("error")
or (f"HTTP {st}" if st else None)
or (str(hub_mon.get("text") or "")[:120] or None)
)
def _tpsl_slots_to_conditional_orders(exchange_tpsl: dict, symbol: str) -> list[dict]:
"""将实例 price_snapshot 的 exchange_tpsl 转为中控条件单结构。"""
out: list[dict] = []
if not isinstance(exchange_tpsl, dict):
return out
for role, label in (("sl", "止损"), ("tp", "止盈")):
slot = exchange_tpsl.get(role)
if not isinstance(slot, dict):
continue
trig = slot.get("trigger_price")
oid = slot.get("order_id")
if trig is None or oid is None:
continue
try:
trig_f = float(trig)
except (TypeError, ValueError):
continue
out.append(
{
"id": str(oid),
"symbol": symbol,
"channel": "algo",
"category": "conditional",
"label": f"{label} {trig_f:g}",
"trigger_price": trig_f,
"amount": None,
"status": "open",
}
)
return out
def _find_exchange_tpsl_for_position(
symbol: str,
side: str,
order_prices: list,
hub_orders: list,
) -> dict | None:
side_l = (side or "").lower()
op_by_id = {
op.get("id"): op
for op in order_prices
if isinstance(op, dict) and op.get("id") is not None
}
for o in hub_orders:
if not isinstance(o, dict):
continue
o_sym = o.get("exchange_symbol") or o.get("symbol") or ""
if not _symbols_match(symbol, o_sym):
continue
if (o.get("direction") or "").lower() != side_l:
continue
op = op_by_id.get(o.get("id"))
if not isinstance(op, dict):
continue
et = op.get("exchange_tpsl")
if isinstance(et, dict) and (et.get("sl") or et.get("tp")):
return et
for op in order_prices:
if not isinstance(op, dict):
continue
if not _symbols_match(symbol, op.get("symbol") or ""):
continue
et = op.get("exchange_tpsl")
if isinstance(et, dict) and (et.get("sl") or et.get("tp")):
return et
return None
def _merge_flask_order_price_fields(hub_mon: dict | None, snap: dict | None) -> None:
"""将 price_snapshot 中的快照盈亏比、已保本状态合并进 hub_monitor.orders。"""
if not isinstance(hub_mon, dict) or not isinstance(snap, dict):
return
order_prices = snap.get("order_prices") or []
op_by_id = {
op.get("id"): op
for op in order_prices
if isinstance(op, dict) and op.get("id") is not None
}
orders = hub_mon.get("orders") or []
if not isinstance(orders, list):
return
for o in orders:
if not isinstance(o, dict):
continue
op = op_by_id.get(o.get("id"))
if not isinstance(op, dict):
continue
if op.get("rr_ratio") is not None:
o["rr_ratio"] = op["rr_ratio"]
if "sl_breakeven_secured" in op:
o["sl_breakeven_secured"] = bool(op["sl_breakeven_secured"])
def _merge_flask_position_breakeven(agent_row: dict, snap: dict | None, hub_mon: dict | None) -> None:
"""将 price_snapshot 的已保本状态同步到 agent 持仓,供中控首页表格展示。"""
ag = agent_row.get("agent")
if not isinstance(ag, dict) or not isinstance(snap, dict):
return
positions = ag.get("positions")
if not isinstance(positions, list) or not positions:
return
order_prices = snap.get("order_prices") or []
hub_orders = []
if isinstance(hub_mon, dict):
hub_orders = hub_mon.get("orders") or []
op_by_id = {
op.get("id"): op
for op in order_prices
if isinstance(op, dict) and op.get("id") is not None
}
for p in positions:
if not isinstance(p, dict):
continue
sym = p.get("symbol") or ""
side = (p.get("side") or "").lower()
matched = None
for o in hub_orders:
if not isinstance(o, dict):
continue
o_sym = o.get("exchange_symbol") or o.get("symbol") or ""
if not _symbols_match(sym, o_sym):
continue
if (o.get("direction") or "").lower() != side:
continue
matched = op_by_id.get(o.get("id"))
break
if matched is None:
for op in order_prices:
if not isinstance(op, dict):
continue
if not _symbols_match(sym, op.get("symbol") or ""):
continue
matched = op
break
if isinstance(matched, dict) and "sl_breakeven_secured" in matched:
p["sl_breakeven_secured"] = bool(matched["sl_breakeven_secured"])
def _agent_position_has_mark(p: dict) -> bool:
try:
v = float(p.get("mark_price"))
return v > 0
except (TypeError, ValueError):
return False
def _apply_agent_mark_price(p: dict, mark_price: object, mark_display: object = None) -> None:
try:
mpf = float(mark_price)
except (TypeError, ValueError):
return
if mpf <= 0:
return
p["mark_price"] = mpf
disp = mark_display
if disp is not None and str(disp).strip() not in ("", "-"):
p["mark_price_fmt"] = str(disp)
def _find_matched_order_price_op(
p: dict,
order_prices: list,
hub_orders: list,
op_by_id: dict,
) -> dict | None:
sym = p.get("symbol") or ""
side = (p.get("side") or "").lower()
for o in hub_orders:
if not isinstance(o, dict):
continue
o_sym = o.get("exchange_symbol") or o.get("symbol") or ""
if not _symbols_match(sym, o_sym):
continue
if (o.get("direction") or "").lower() != side:
continue
matched = op_by_id.get(o.get("id"))
if isinstance(matched, dict):
return matched
break
for op in order_prices:
if not isinstance(op, dict):
continue
if not _symbols_match(sym, op.get("symbol") or ""):
continue
return op
return None
def _merge_flask_position_mark_price(
agent_row: dict, snap: dict | None, hub_mon: dict | None
) -> None:
"""子代理无标记价时,用实例 price_snapshot 的交易所标记价补全中控持仓展示。"""
ag = agent_row.get("agent")
if not isinstance(ag, dict) or not isinstance(snap, dict):
return
positions = ag.get("positions")
if not isinstance(positions, list) or not positions:
return
order_prices = snap.get("order_prices") or []
hub_orders = []
if isinstance(hub_mon, dict):
hub_orders = hub_mon.get("orders") or []
op_by_id = {
op.get("id"): op
for op in order_prices
if isinstance(op, dict) and op.get("id") is not None
}
for p in positions:
if not isinstance(p, dict) or _agent_position_has_mark(p):
continue
matched = _find_matched_order_price_op(p, order_prices, hub_orders, op_by_id)
if isinstance(matched, dict):
_apply_agent_mark_price(
p,
matched.get("exchange_mark_price"),
matched.get("exchange_mark_price_display"),
)
position_marks = snap.get("position_marks") or []
if not isinstance(position_marks, list):
return
for p in positions:
if not isinstance(p, dict) or _agent_position_has_mark(p):
continue
sym = p.get("symbol") or ""
side = (p.get("side") or "").lower()
for pm in position_marks:
if not isinstance(pm, dict):
continue
if not _symbols_match(sym, pm.get("symbol") or ""):
continue
if (pm.get("side") or "").lower() != side:
continue
_apply_agent_mark_price(
p, pm.get("mark_price"), pm.get("mark_price_display")
)
break
def _merge_flask_exchange_tpsl(agent_row: dict, snap: dict | None, hub_mon: dict | None) -> None:
"""子代理挂单为空时,用实例 Flask 已算好的 exchange_tpsl 补全展示。"""
ag = agent_row.get("agent")
if not isinstance(ag, dict):
return
positions = ag.get("positions")
if not isinstance(positions, list) or not positions:
return
if not isinstance(snap, dict):
return
order_prices = snap.get("order_prices") or []
hub_orders = []
if isinstance(hub_mon, dict):
hub_orders = hub_mon.get("orders") or []
for p in positions:
if not isinstance(p, dict):
continue
sym = p.get("symbol") or ""
side = p.get("side") or ""
et = _find_exchange_tpsl_for_position(sym, side, order_prices, hub_orders)
if not et:
continue
p["exchange_tpsl"] = et
cond = p.get("conditional_orders") or []
if not cond:
p["conditional_orders"] = _tpsl_slots_to_conditional_orders(et, sym)
async def _fetch_exchange_flask_bundle(
client: httpx.AsyncClient, ex: dict
) -> tuple[dict | None, dict | None, list | None, dict | None]:
"""单所 Flaskmonitor / meta / price_snapshot(有 flask_url 时)并行拉取。"""
caps = ex.get("capabilities") or []
tasks = [
_fetch_flask_json(client, ex, "/api/hub/monitor"),
_fetch_flask_json(client, ex, "/api/hub/meta"),
]
has_flask = bool((ex.get("flask_url") or "").strip())
if has_flask:
tasks.append(_fetch_flask_json(client, ex, "/api/price_snapshot"))
results = await asyncio.gather(*tasks)
hub_mon = results[0]
meta = results[1]
snap = results[2] if has_flask and len(results) > 2 else None
key_prices = None
want_prices = HUB_BOARD_KEY_PRICES and "key" in caps
if want_prices and isinstance(snap, dict):
key_prices = snap.get("key_prices")
return hub_mon, meta, key_prices, snap if isinstance(snap, dict) else None
async def _assemble_board_row(
client: httpx.AsyncClient, ex: dict, agent_row: dict
) -> dict:
hub_mon, meta, key_prices, snap = await _fetch_exchange_flask_bundle(client, ex)
if isinstance(hub_mon, dict):
_merge_flask_order_price_fields(hub_mon, snap)
_merge_flask_exchange_tpsl(agent_row, snap, hub_mon if isinstance(hub_mon, dict) else None)
_merge_flask_position_breakeven(agent_row, snap, hub_mon if isinstance(hub_mon, dict) else None)
_merge_flask_position_mark_price(agent_row, snap, hub_mon if isinstance(hub_mon, dict) else None)
flask_ok = isinstance(hub_mon, dict) and hub_mon.get("ok") is not False
raw_review = (ex.get("review_url") or "").strip()
review_link = browser_url(raw_review) if raw_review else default_review_url(
ex.get("flask_url")
)
return {
**agent_row,
"flask_url": ex.get("flask_url") or "",
"flask_url_browser": browser_url(ex.get("flask_url")),
"review_url": review_link,
"hub_monitor": hub_mon,
"flask_ok": flask_ok,
"flask_error": _flask_error_from_hub_mon(hub_mon if isinstance(hub_mon, dict) else None),
"meta": (meta or {}).get("meta") if isinstance(meta, dict) else meta,
"key_prices": key_prices,
}
async def _build_monitor_board_payload() -> dict:
exchanges = enabled_exchanges()
async with httpx.AsyncClient() as client:
agent_rows = await asyncio.gather(
*[_fetch_agent_status(client, ex) for ex in exchanges]
)
out = await asyncio.gather(
*[
_assemble_board_row(client, ex, agent_row)
for ex, agent_row in zip(exchanges, agent_rows)
]
)
return {
"rows": list(out),
"updated_at": __import__("datetime").datetime.now().isoformat(timespec="seconds"),
}
@app.get("/api/monitor/board")
@app.get("/api/monitor/board/snapshot")
async def api_monitor_board_snapshot():
"""读后台缓存快照;完整聚合由 hub 每 HUB_BOARD_POLL_INTERVAL 秒执行。"""
return board_store.snapshot_dict()
@app.get("/api/monitor/board/stream")
async def api_monitor_board_stream():
from fastapi.responses import StreamingResponse
return StreamingResponse(
board_store.iter_sse(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
@app.post("/api/monitor/board/refresh")
async def api_monitor_board_refresh():
_schedule_board_refresh()
return {"ok": True, "board_version": board_store.version}
def _require_hub_logged_in(request: Request) -> None:
if password_required() and not validate_session_token(request.cookies.get(SESSION_COOKIE)):
raise HTTPException(status_code=401, detail="未登录中控")
@app.get("/api/instance/open-url")
def api_instance_open_url(
request: Request,
exchange_id: str,
next: str = "/",
embed: str = "",
hub_theme: str = "",
):
"""已登录中控时生成实例 SSO 打开链接(2h 有效、单次使用,复用 HUB_BRIDGE_TOKEN)。"""
_require_hub_logged_in(request)
if not HUB_BRIDGE_TOKEN:
raise HTTPException(status_code=503, detail="未配置 HUB_BRIDGE_TOKEN,无法签发实例打开链接")
ex = _find_exchange(exchange_id)
if not ex:
raise HTTPException(status_code=404, detail="未知交易所 id")
base = browser_url((ex.get("flask_url") or "").strip()).rstrip("/")
if not base:
raise HTTPException(status_code=400, detail="该账户未配置 flask_url")
ex_key = (ex.get("key") or "").strip().lower()
if not ex_key:
raise HTTPException(status_code=400, detail="该账户缺少 key(用于 SSO 校验)")
nxt = safe_next_path(next)
token = mint_hub_sso_token(ex_key, nxt)
if not token:
raise HTTPException(status_code=503, detail="签发 SSO 失败")
params = {"token": token, "next": nxt}
if (embed or "").strip().lower() in ("1", "true", "yes", "on"):
params["embed"] = "1"
ht = (hub_theme or "").strip().lower()
if ht in ("light", "dark"):
params["hub_theme"] = ht
q = urlencode(params)
return {
"ok": True,
"url": f"{base}/hub-sso?{q}",
"expires_in": HUB_SSO_TTL_SEC,
"exchange_id": exchange_id,
"exchange_key": ex_key,
}
class CloseAllBody(BaseModel):
exclude_ids: list[str] = Field(default_factory=list)
class ClosePositionBody(BaseModel):
symbol: str
side: str
class CancelOrderBody(BaseModel):
symbol: str
order_id: str
channel: str = "regular"
class CancelSymbolOrdersBody(BaseModel):
symbol: str
scope: str = "all"
class PlaceTpslBody(BaseModel):
symbol: str
side: str
stop_loss: float
take_profit: float
contracts: float | None = None
class TrendPlanActionBody(BaseModel):
plan_id: int
breakeven_offset_pct: float | None = None
def _flask_hub_messages(parsed: dict | None) -> tuple[bool, str]:
if not isinstance(parsed, dict):
return False, "实例返回无效"
msgs = list(parsed.get("messages") or [])
if parsed.get("msg"):
msgs.insert(0, str(parsed["msg"]))
if parsed.get("error"):
msgs.append(str(parsed["error"]))
ok = parsed.get("ok") is not False
if parsed.get("ok") is True:
ok = True
elif parsed.get("ok") is False:
ok = False
else:
for m in msgs:
if any(
k in str(m)
for k in ("失败", "错误", "无法", "缺少", "过期", "未找到", "不允许", "异常")
):
ok = False
break
text = "".join(str(x) for x in msgs if x) or ("成功" if ok else "操作失败")
return ok, text
@app.post("/api/trend/{exchange_id}/stop")
async def api_trend_plan_stop(exchange_id: str, body: TrendPlanActionBody):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
if "trend" not in (ex.get("capabilities") or []):
raise HTTPException(status_code=400, detail="该账户未启用趋势计划监控")
pid = int(body.plan_id)
async with httpx.AsyncClient() as client:
parsed = await _fetch_flask_json(
client, ex, f"/api/hub/trend/stop/{pid}", method="POST"
)
ok, text = _flask_hub_messages(parsed)
_schedule_board_refresh()
return {"ok": ok, "message": text, "payload": parsed}
@app.post("/api/trend/{exchange_id}/breakeven")
async def api_trend_plan_breakeven(exchange_id: str, body: TrendPlanActionBody):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
if "trend" not in (ex.get("capabilities") or []):
raise HTTPException(status_code=400, detail="该账户未启用趋势计划监控")
pid = int(body.plan_id)
data = {}
if body.breakeven_offset_pct is not None:
data["breakeven_offset_pct"] = str(body.breakeven_offset_pct)
async with httpx.AsyncClient() as client:
parsed = await _fetch_flask_json(
client,
ex,
f"/api/hub/trend/breakeven/{pid}",
method="POST",
data=data,
)
ok, text = _flask_hub_messages(parsed)
_schedule_board_refresh()
return {"ok": ok, "message": text, "payload": parsed}
@app.post("/api/orders/{exchange_id}/cancel")
async def api_cancel_order(exchange_id: str, body: CancelOrderBody):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
url = f"{ex['agent_url'].rstrip('/')}/orders/cancel"
async with httpx.AsyncClient() as client:
r = await client.post(
url,
headers=_agent_headers(),
json={
"symbol": body.symbol,
"order_id": body.order_id,
"channel": body.channel or "regular",
},
timeout=60.0,
)
try:
payload = r.json()
except Exception:
payload = {"raw": (r.text or "")[:2000]}
out = {
"exchange": ex,
"status_code": r.status_code,
"payload": payload,
"ok": bool(isinstance(payload, dict) and payload.get("ok")),
}
_schedule_board_refresh()
return out
@app.post("/api/orders/{exchange_id}/cancel-symbol")
async def api_cancel_symbol_orders(exchange_id: str, body: CancelSymbolOrdersBody):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
url = f"{ex['agent_url'].rstrip('/')}/orders/cancel-symbol"
async with httpx.AsyncClient() as client:
r = await client.post(
url,
headers=_agent_headers(),
json={"symbol": body.symbol, "scope": body.scope or "all"},
timeout=120.0,
)
try:
payload = r.json()
except Exception:
payload = {"raw": (r.text or "")[:2000]}
out = {
"exchange": ex,
"status_code": r.status_code,
"payload": payload,
"ok": bool(isinstance(payload, dict) and payload.get("ok")),
}
_schedule_board_refresh()
return out
@app.post("/api/close/{exchange_id}/position")
async def api_close_position(exchange_id: str, body: ClosePositionBody):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
sym = (body.symbol or "").strip()
side = (body.side or "").strip().lower()
if not sym:
raise HTTPException(status_code=400, detail="symbol 不能为空")
if side not in ("long", "short"):
raise HTTPException(status_code=400, detail="side 须为 long 或 short")
url = f"{ex['agent_url'].rstrip('/')}/emergency/close-position"
async with httpx.AsyncClient() as client:
r = await client.post(
url,
headers=_agent_headers(),
json={"symbol": sym, "side": side},
timeout=120.0,
)
try:
payload = r.json()
except Exception:
payload = {"raw": (r.text or "")[:2000]}
out = {
"exchange": ex,
"status_code": r.status_code,
"payload": payload,
"ok": bool(isinstance(payload, dict) and payload.get("ok")),
}
_schedule_board_refresh()
return out
@app.post("/api/orders/{exchange_id}/place-tpsl")
async def api_place_tpsl(exchange_id: str, body: PlaceTpslBody):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
url = f"{ex['agent_url'].rstrip('/')}/orders/place-tpsl"
async with httpx.AsyncClient() as client:
r = await client.post(
url,
headers=_agent_headers(),
json={
"symbol": body.symbol,
"side": body.side,
"stop_loss": body.stop_loss,
"take_profit": body.take_profit,
"contracts": body.contracts,
},
timeout=120.0,
)
try:
payload = r.json()
except Exception:
payload = {"raw": (r.text or "")[:2000]}
out = {
"exchange": ex,
"status_code": r.status_code,
"payload": payload,
"ok": bool(isinstance(payload, dict) and payload.get("ok")),
}
_schedule_board_refresh()
return out
@app.post("/api/close/{exchange_id}")
async def api_close_exchange(exchange_id: str):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
url = f"{ex['agent_url'].rstrip('/')}/emergency/close-all"
async with httpx.AsyncClient() as client:
r = await client.post(url, headers=_agent_headers(), timeout=120.0)
try:
body = r.json()
except Exception:
body = {"raw": (r.text or "")[:2000]}
out = {"exchange": ex, "status_code": r.status_code, "payload": body}
_schedule_board_refresh()
return out
@app.post("/api/close-all")
async def api_close_all(body: CloseAllBody | None = Body(default=None)):
excl = set(body.exclude_ids if body else [])
excl |= env_force_disabled_ids()
targets = [x for x in enabled_exchanges() if str(x["id"]) not in excl]
async with httpx.AsyncClient() as client:
async def one(ex: dict):
url = f"{ex['agent_url'].rstrip('/')}/emergency/close-all"
try:
r = await client.post(url, headers=_agent_headers(), timeout=120.0)
try:
payload = r.json()
except Exception:
payload = {"raw": (r.text or "")[:2000]}
return {"id": ex["id"], "name": ex["name"], "status_code": r.status_code, "payload": payload}
except Exception as e:
return {"id": ex["id"], "name": ex["name"], "status_code": None, "error": str(e)}
results = await asyncio.gather(*[one(ex) for ex in targets])
_schedule_board_refresh()
return {"results": list(results)}
def _trade_removed_response():
"""旧版前端或缓存页面仍会请求 /api/trade/*,勿解析表单,直接返回说明。"""
return JSONResponse(
{
"ok": False,
"result": {
"ok": False,
"messages": [
"中控已移除下单区。请在监控卡片点击「实例」,"
"进入对应 crypto_monitor_* 网页添加关键位或下单。"
],
},
"deprecated": True,
},
status_code=410,
)
@app.get("/api/ping")
def api_ping():
return {
"ok": True,
"service": "manual-trading-hub",
"build": HUB_BUILD,
"trade_ui": False,
"features": ["monitor", "settings", "auth", "board_sse"],
"board_poll_interval_sec": HUB_BOARD_POLL_INTERVAL,
"board_version": board_store.version,
"board_aggregating": board_store.aggregating,
"board_updated_at": (board_store.payload or {}).get("updated_at")
if isinstance(board_store.payload, dict)
else None,
"board_error": board_store.last_error,
"password_required": password_required(),
"env_disabled_ids": sorted(env_force_disabled_ids()),
"hub_disabled_ids_raw": (os.getenv("HUB_DISABLED_IDS") or ""),
}
@app.post("/api/trade/order/{exchange_id}")
@app.post("/api/trade/key/{exchange_id}")
@app.post("/api/trade/trend/preview/{exchange_id}")
@app.post("/api/trade/trend/execute/{exchange_id}")
async def api_trade_removed(exchange_id: str):
return _trade_removed_response()
@app.get("/api/trade/meta/{exchange_id}")
@app.get("/api/trade/trend/preview/{exchange_id}/{preview_id}")
async def api_trade_removed_get(exchange_id: str, preview_id: str = ""):
return _trade_removed_response()
def main():
import uvicorn
print(
f"manual-trading-hub start build={HUB_BUILD} listen={HUB_HOST}:{HUB_PORT}",
flush=True,
)
uvicorn.run(app, host=HUB_HOST, port=HUB_PORT, log_level="info", access_log=False)
if __name__ == "__main__":
main()