中控增加下单,关键位,系统设置

This commit is contained in:
dekun
2026-05-22 10:04:28 +08:00
parent ed6b56ff87
commit 46f73fce43
19 changed files with 1844 additions and 568 deletions
+231 -165
View File
@@ -1,27 +1,6 @@
"""
中控:聚合各子账户 /status,转发紧急全平
默认 **HUB_HOST=0.0.0.0** 且 **HUB_TRUST_LAN=开启**,便于局域网内浏览器访问;中间件仍拒绝非公网、非 RFC1918 私网的来源(本机 127.0.0.1 始终允许)。
若仅需本机访问,请设置:HUB_HOST=127.0.0.1 或 HUB_TRUST_LAN=0false/off)。
与仓库根目录下四个策略/监控项目对应时,中控默认聚合的子代理地址为 127.0.0.1:1520015203
(与各 crypto_monitor_* 里 Flask 的 APP_PORT 错开;Flask 仍用各自 .env 的 APP_HOST/APP_PORT)。
crypto_monitor_binance → 子代理建议 15200
crypto_monitor_okx → 子代理建议 15201
crypto_monitor_gate → 子代理建议 15202
crypto_monitor_gate_bot→ 子代理建议 15203
各目录单独启动 agent.py 时设置 PORT=上述端口(环境变量名是 PORT,不是 APP_PORT),与 Flask 并存。
环境变量:
HUB_PORT 默认 5100
HUB_HOST 默认 0.0.0.0(局域网可连);改为 127.0.0.1 则仅本机
HUB_AGENTS 逗号分隔子代理 URL,留空则默认 1520015203(避免与 Flask APP_PORT 冲突)
HUB_AGENT_NAMES 可选,逗号分隔显示名,与 URL 顺序对应
HUB_DISABLED_IDS 可选,逗号分隔不参与监控/全平的账户 id(与 /api/agents 中 id 一致),例:暂不用 OKX 时写 1
CONTROL_TOKEN 若子代理启用校验,在此填同一令牌(由中控代发请求头)
HUB_TRUST_LAN 默认开启;设为 0/false/off 则仅允许本机 IP 访问(与 HUB_HOST=0.0.0.0 搭配时仍只放行 127.0.0.1
多账户交易中控:监控区 / 下单区 / 系统设置
转发至各 crypto_monitor_* 的 /api/hub/* 与子代理 /status。
"""
from __future__ import annotations
@@ -35,9 +14,16 @@ from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
from settings_store import (
enabled_exchanges,
env_force_disabled_ids,
load_settings,
save_settings,
)
HUB_HOST = os.getenv("HUB_HOST", "0.0.0.0")
HUB_PORT = int(os.getenv("HUB_PORT", "5100"))
CONTROL_TOKEN = (os.getenv("CONTROL_TOKEN") or "").strip()
HUB_BRIDGE_TOKEN = (os.getenv("HUB_BRIDGE_TOKEN") or os.getenv("CONTROL_TOKEN") or "").strip()
_trust_raw = (os.getenv("HUB_TRUST_LAN", "true") or "").strip().lower()
HUB_TRUST_LAN = _trust_raw not in ("0", "false", "no", "off")
DIR = Path(__file__).resolve().parent
@@ -80,58 +66,23 @@ def _client_allowed(host: str | None) -> bool:
return False
def _agent_headers() -> dict[str, str]:
if not CONTROL_TOKEN:
def _hub_headers() -> dict[str, str]:
if not HUB_BRIDGE_TOKEN:
return {}
return {"X-Control-Token": CONTROL_TOKEN}
return {"X-Hub-Token": HUB_BRIDGE_TOKEN}
_DEFAULT_FOLDER_LABELS = (
"币安山寨账户 · crypto_monitor_binance",
"OKX · crypto_monitor_okx",
"Gate训练账户 · crypto_monitor_gate",
"Gate趋势回调 · crypto_monitor_gate_bot",
)
def _agent_headers() -> dict[str, str]:
if not HUB_BRIDGE_TOKEN:
return {}
return {"X-Control-Token": HUB_BRIDGE_TOKEN}
def _ids_from_csv(raw: str | None) -> set[str]:
if not raw or not str(raw).strip():
return set()
return {x.strip() for x in str(raw).split(",") if x.strip()}
def hub_env_excluded_ids() -> set[str]:
"""服务端固定关闭的账户(不参与拉取 /status、不参与全局全平)。"""
return _ids_from_csv(os.getenv("HUB_DISABLED_IDS"))
def merged_excluded_ids(query_exclude: str | None, body_ids: list[str] | None) -> set[str]:
s = hub_env_excluded_ids()
s |= _ids_from_csv(query_exclude)
if body_ids:
s |= {str(x).strip() for x in body_ids if str(x).strip()}
return s
def parse_agents() -> list[dict[str, str]]:
urls_s = (os.getenv("HUB_AGENTS") or "").strip()
if urls_s:
urls = [u.strip() for u in urls_s.split(",") if u.strip()]
else:
urls = [f"http://127.0.0.1:{p}" for p in range(15200, 15204)]
# 注意:若环境变量 HUB_AGENT_NAMES 非空,会完全优先于 _DEFAULT_FOLDER_LABELS(改代码不生效时请检查是否设了该变量)
names_s = (os.getenv("HUB_AGENT_NAMES") or "").strip()
names = [n.strip() for n in names_s.split(",") if n.strip()] if names_s else []
out = []
for i, url in enumerate(urls):
if i < len(names):
name = names[i]
elif i < len(_DEFAULT_FOLDER_LABELS):
name = _DEFAULT_FOLDER_LABELS[i]
else:
name = f"账户{i + 1}"
out.append({"id": str(i), "name": name, "url": url.rstrip("/")})
return out
def _find_exchange(ex_id: str) -> dict | None:
for ex in load_settings().get("exchanges") or []:
if str(ex.get("id")) == str(ex_id):
return ex
return None
app = FastAPI(title="hub", docs_url=None, redoc_url=None)
@@ -147,121 +98,236 @@ async def local_only(request: Request, call_next):
return await call_next(request)
@app.get("/")
def index_page():
def _shell_page():
index = STATIC_DIR / "index.html"
if not index.is_file():
return JSONResponse({"detail": "missing static/index.html"}, status_code=500)
return FileResponse(index)
@app.get("/api/agents")
def api_agents():
return {"agents": parse_agents()}
@app.get("/")
def root_redirect():
from fastapi.responses import RedirectResponse
return RedirectResponse("/monitor")
@app.get("/monitor")
@app.get("/trade")
@app.get("/settings")
def shell_pages():
return _shell_page()
@app.get("/api/settings")
def api_get_settings():
return load_settings()
class SettingsBody(BaseModel):
exchanges: list[dict] = Field(default_factory=list)
@app.post("/api/settings")
def api_save_settings(body: SettingsBody):
data = {"version": 1, "exchanges": body.exchanges}
save_settings(data)
return {"ok": True}
@app.get("/api/settings/meta")
def api_settings_meta():
return {
"env_disabled_ids": sorted(env_force_disabled_ids()),
"hub_bridge_token_set": bool(HUB_BRIDGE_TOKEN),
"capability_options": ["order", "key", "trend"],
}
async def _fetch_agent_status(client: httpx.AsyncClient, ex: dict) -> dict:
url = f"{ex['agent_url'].rstrip('/')}/status"
try:
r = await client.get(url, headers=_agent_headers(), timeout=12.0)
body = r.json() if r.content else {}
return {
"id": ex["id"],
"name": ex["name"],
"key": ex.get("key"),
"agent_url": ex["agent_url"],
"flask_url": ex.get("flask_url"),
"capabilities": ex.get("capabilities") or [],
"http_ok": r.status_code == 200,
"agent": body,
"error": body.get("error") if isinstance(body, dict) else None,
}
except Exception as e:
return {
"id": ex["id"],
"name": ex["name"],
"key": ex.get("key"),
"agent_url": ex["agent_url"],
"flask_url": ex.get("flask_url"),
"capabilities": ex.get("capabilities") or [],
"http_ok": False,
"error": str(e),
"agent": None,
}
async def _fetch_flask_json(
client: httpx.AsyncClient, ex: dict, path: str, method: str = "GET", data=None
) -> dict | None:
base = (ex.get("flask_url") or "").rstrip("/")
if not base:
return None
try:
if method == "GET":
r = await client.get(f"{base}{path}", headers=_hub_headers(), timeout=15.0)
else:
r = await client.post(f"{base}{path}", headers=_hub_headers(), data=data, timeout=120.0)
if r.status_code >= 400:
return {"ok": False, "status": r.status_code, "text": (r.text or "")[:500]}
return r.json()
except Exception as e:
return {"ok": False, "error": str(e)}
@app.get("/api/monitor/board")
async def api_monitor_board():
exchanges = enabled_exchanges()
async with httpx.AsyncClient() as client:
agent_rows = await asyncio.gather(*[_fetch_agent_status(client, ex) for ex in exchanges])
out = []
for ex, agent_row in zip(exchanges, agent_rows):
hub_mon = await _fetch_flask_json(client, ex, "/api/hub/monitor")
meta = await _fetch_flask_json(client, ex, "/api/hub/meta")
key_prices = None
if "key" in (ex.get("capabilities") or []):
snap = await _fetch_flask_json(client, ex, "/api/price_snapshot")
if isinstance(snap, dict):
key_prices = snap.get("key_prices")
out.append(
{
**agent_row,
"review_url": ex.get("review_url") or "",
"hub_monitor": hub_mon,
"meta": (meta or {}).get("meta") if isinstance(meta, dict) else meta,
"key_prices": key_prices,
}
)
return {"rows": out, "updated_at": __import__("datetime").datetime.now().isoformat(timespec="seconds")}
class CloseAllBody(BaseModel):
exclude_ids: list[str] = Field(default_factory=list)
@app.get("/api/snapshot")
async def api_snapshot(
exclude_ids: str | None = Query(
default=None,
description="逗号分隔,浏览器侧再关闭的账户 id,与服务端 HUB_DISABLED_IDS 合并",
),
):
excl = merged_excluded_ids(exclude_ids, None)
agents = [a for a in parse_agents() if a["id"] not in excl]
headers = _agent_headers()
async def one(client: httpx.AsyncClient, a: dict[str, str]) -> dict:
url = f"{a['url']}/status"
try:
r = await client.get(url, headers=headers, timeout=10.0)
body = None
if r.content:
try:
body = r.json()
except Exception as je:
preview = (r.text or "")[:400].replace("\n", " ")
return {
"id": a["id"],
"name": a["name"],
"url": a["url"],
"http_ok": False,
"status_code": r.status_code,
"error": f"子代理返回非 JSON{je})。响应片段: {preview!r}",
"payload": None,
}
return {
"id": a["id"],
"name": a["name"],
"url": a["url"],
"http_ok": r.status_code == 200,
"status_code": r.status_code,
"payload": body,
}
except Exception as e:
return {
"id": a["id"],
"name": a["name"],
"url": a["url"],
"http_ok": False,
"status_code": None,
"error": str(e),
"payload": None,
}
@app.post("/api/close/{exchange_id}")
async def api_close_exchange(exchange_id: str):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
url = f"{ex['agent_url'].rstrip('/')}/emergency/close-all"
async with httpx.AsyncClient() as client:
rows = await asyncio.gather(*[one(client, a) for a in agents])
env_ex = sorted(hub_env_excluded_ids())
return {"rows": list(rows), "env_excluded_ids": env_ex}
@app.post("/api/close/{agent_id}")
async def api_close_one(agent_id: str):
agents = parse_agents()
target = next((a for a in agents if a["id"] == agent_id), None)
if not target:
raise HTTPException(status_code=404, detail="unknown agent")
headers = _agent_headers()
url = f"{target['url']}/emergency/close-all"
try:
async with httpx.AsyncClient() as client:
r = await client.post(url, headers=headers, timeout=120.0)
try:
body = r.json()
except Exception:
body = {"raw": r.text[:2000]}
return {"agent": target, "status_code": r.status_code, "payload": body}
except Exception as e:
raise HTTPException(status_code=502, detail=str(e)) from e
r = await client.post(url, headers=_agent_headers(), timeout=120.0)
try:
body = r.json()
except Exception:
body = {"raw": (r.text or "")[:2000]}
return {"exchange": ex, "status_code": r.status_code, "payload": body}
@app.post("/api/close-all")
async def api_close_all(body: CloseAllBody | None = Body(default=None)):
excl = merged_excluded_ids(None, body.exclude_ids if body else None)
agents = [a for a in parse_agents() if a["id"] not in excl]
headers = _agent_headers()
async def post_close(client: httpx.AsyncClient, a: dict[str, str]) -> dict:
url = f"{a['url']}/emergency/close-all"
try:
r = await client.post(url, headers=headers, timeout=120.0)
try:
body = r.json()
except Exception:
body = {"raw": r.text[:2000]}
return {"id": a["id"], "name": a["name"], "status_code": r.status_code, "payload": body}
except Exception as e:
return {"id": a["id"], "name": a["name"], "status_code": None, "error": str(e)}
excl = set(body.exclude_ids if body else [])
excl |= env_force_disabled_ids()
targets = [x for x in enabled_exchanges() if str(x["id"]) not in excl]
async with httpx.AsyncClient() as client:
results = await asyncio.gather(*[post_close(client, a) for a in agents])
async def one(ex: dict):
url = f"{ex['agent_url'].rstrip('/')}/emergency/close-all"
try:
r = await client.post(url, headers=_agent_headers(), timeout=120.0)
try:
payload = r.json()
except Exception:
payload = {"raw": (r.text or "")[:2000]}
return {"id": ex["id"], "name": ex["name"], "status_code": r.status_code, "payload": payload}
except Exception as e:
return {"id": ex["id"], "name": ex["name"], "status_code": None, "error": str(e)}
results = await asyncio.gather(*[one(ex) for ex in targets])
return {"results": list(results)}
@app.get("/api/trade/meta/{exchange_id}")
async def api_trade_meta(exchange_id: str):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
async with httpx.AsyncClient() as client:
meta = await _fetch_flask_json(client, ex, "/api/hub/meta")
return {"exchange": ex, "meta": meta}
@app.post("/api/trade/order/{exchange_id}")
async def api_trade_order(exchange_id: str, request: Request):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
form = await request.form()
async with httpx.AsyncClient() as client:
result = await _fetch_flask_json(client, ex, "/api/hub/add_order", "POST", dict(form))
return {"exchange": ex, "result": result}
@app.post("/api/trade/key/{exchange_id}")
async def api_trade_key(exchange_id: str, request: Request):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
if "key" not in (ex.get("capabilities") or []):
raise HTTPException(status_code=400, detail="该账户不支持关键位")
form = await request.form()
async with httpx.AsyncClient() as client:
result = await _fetch_flask_json(client, ex, "/api/hub/add_key", "POST", dict(form))
return {"exchange": ex, "result": result}
@app.post("/api/trade/trend/preview/{exchange_id}")
async def api_trade_trend_preview(exchange_id: str, request: Request):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
if "trend" not in (ex.get("capabilities") or []):
raise HTTPException(status_code=400, detail="该账户不支持趋势回调")
form = await request.form()
async with httpx.AsyncClient() as client:
result = await _fetch_flask_json(client, ex, "/api/hub/trend/preview", "POST", dict(form))
return {"exchange": ex, "result": result}
@app.post("/api/trade/trend/execute/{exchange_id}")
async def api_trade_trend_execute(exchange_id: str, request: Request):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
form = await request.form()
async with httpx.AsyncClient() as client:
result = await _fetch_flask_json(client, ex, "/api/hub/trend/execute", "POST", dict(form))
return {"exchange": ex, "result": result}
@app.get("/api/trade/trend/preview/{exchange_id}/{preview_id}")
async def api_trade_trend_preview_get(exchange_id: str, preview_id: str):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
async with httpx.AsyncClient() as client:
result = await _fetch_flask_json(client, ex, f"/api/hub/trend/preview/{preview_id}")
return {"exchange": ex, "result": result}
def main():
import uvicorn