Files
crypto_monitor/manual_trading_hub/hub.py
T
2026-06-22 16:50:09 +08:00

2566 lines
85 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
多账户交易中控:监控区 / 系统设置。
聚合各实例监控数据与子代理 /status;下单请在各实例网页操作。
"""
from __future__ import annotations
import asyncio
import os
import sys
from contextlib import asynccontextmanager
from pathlib import Path
_REPO_ROOT = Path(__file__).resolve().parent.parent
if str(_REPO_ROOT) not in sys.path:
sys.path.insert(0, str(_REPO_ROOT))
from hub_kline_store import format_ohlcv_detail, resolve_chart_bars, retention_days
from hub_ohlcv_lib import (
CHART_TIMEFRAME_ORDER,
CHART_TIMEFRAMES,
bar_limit_for_timeframe,
chart_chunk_limit,
chart_initial_limit,
chart_memory_cap,
retention_policy_meta,
)
from hub_volume_rank_lib import (
TOP_N_DEFAULT,
_exchange_rank_row_stale,
cache_needs_refresh,
format_volume_quote,
get_cached_rank,
load_volume_rank_cache,
merge_exchange_rank,
rank_date_label,
save_volume_rank_cache,
seconds_until_next_reset,
volume_rank_reset_hour,
)
from hub_symbol_archive_lib import (
ARCHIVE_DEFAULT_TIMEFRAME,
ARCHIVE_QUOTES_MAX,
ARCHIVE_SEED_LOOKBACK_DAYS,
ARCHIVE_SYNC_INTERVAL_SEC,
ARCHIVE_TIMEFRAMES,
ARCHIVE_TRADE_DAYS,
ARCHIVE_TRADE_LIMIT,
ARCHIVE_VISIBLE_BARS_DEFAULT,
create_review_quote,
delete_review_quote,
init_db as init_archive_db,
list_daily_trades,
list_review_quotes,
list_symbol_rows,
load_symbol_trades,
parse_wall_clock_ms,
resolve_archive_chart,
sync_exchange_symbol_archives,
today_trading_day,
update_review_quote,
upsert_trade_overlay,
)
from hub_entry_plan_lib import (
compute_entry_plan_stats,
create_entry_plan,
delete_entry_plan,
get_entry_plan,
init_db as init_entry_plan_db,
list_entry_plans,
meta_payload as entry_plan_meta_payload,
update_entry_plan,
)
from hub_macro_calendar_lib import (
MACRO_EVENT_LABELS,
MACRO_EVENT_TYPES,
create_event as create_macro_event,
delete_event as delete_macro_event,
init_db as init_macro_calendar_db,
list_active_alerts,
list_events as list_macro_events,
update_event as update_macro_event,
)
from env_load import load_hub_dotenv
load_hub_dotenv()
import httpx
from fastapi import Body, FastAPI, HTTPException, Request
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
from settings_store import (
enabled_exchanges,
env_force_disabled_ids,
load_settings,
normalize_display_prefs,
save_settings,
)
from hub_web_auth import (
SESSION_COOKIE,
SESSION_MAX_AGE_SEC,
clear_session_cookie,
cookie_secure_for_request,
create_session_token,
embed_allowed,
embed_frame_ancestors,
is_public_path,
password_required,
set_session_cookie,
validate_session_token,
expected_username,
verify_credentials,
)
from hub_sso import HUB_SSO_TTL_SEC, mint_hub_sso_token, safe_next_path
from url_public import browser_url, default_review_url, public_origin
from urllib.parse import urlencode
from hub_board_cache import HUB_BOARD_POLL_INTERVAL, board_store
from hub_dashboard_cache import dashboard_store
from hub_dashboard import DASHBOARD_POLL_INTERVAL_SEC
from hub_chart_cache import (
HUB_CHART_POLL_INTERVAL,
HUB_CHART_WATCH_TTL_SEC,
chart_poll_store,
parse_series_key,
)
try:
from exchange_orders import symbols_match as _symbols_match
except ImportError:
def _symbols_match(position_symbol: str, order_symbol: str) -> bool:
a = (position_symbol or "").strip().upper()
b = (order_symbol or "").strip().upper()
return bool(a and b and a == b)
HUB_HOST = os.getenv("HUB_HOST", "0.0.0.0")
HUB_PORT = int(os.getenv("HUB_PORT", "5100"))
HUB_BRIDGE_TOKEN = (os.getenv("HUB_BRIDGE_TOKEN") or os.getenv("CONTROL_TOKEN") or "").strip()
_trust_raw = (os.getenv("HUB_TRUST_LAN", "true") or "").strip().lower()
HUB_TRUST_LAN = _trust_raw not in ("0", "false", "no", "off")
_allow_pub_raw = (os.getenv("HUB_ALLOW_PUBLIC") or "").strip().lower()
# 云服务器 + 域名反代时设为 true:不做 IP 限制,仅靠 HUB_PASSWORD / 登录页保护
HUB_ALLOW_PUBLIC = _allow_pub_raw in ("1", "true", "yes", "on")
DIR = Path(__file__).resolve().parent
HUB_BUILD = "20260607-hub-archive"
_archive_sync_stop: asyncio.Event | None = None
_archive_sync_task: asyncio.Task | None = None
_last_archive_sync: dict | None = None
_volume_rank_stop: asyncio.Event | None = None
_volume_rank_task: asyncio.Task | None = None
_volume_rank_cache: dict | None = None
HUB_AGENT_TIMEOUT = float(os.getenv("HUB_AGENT_TIMEOUT", "8"))
HUB_FLASK_TIMEOUT = float(os.getenv("HUB_FLASK_TIMEOUT", "10"))
HUB_BOARD_TIMEOUT = float(os.getenv("HUB_BOARD_TIMEOUT", "45"))
_board_key_prices_raw = (os.getenv("HUB_BOARD_KEY_PRICES", "true") or "").strip().lower()
HUB_BOARD_KEY_PRICES = _board_key_prices_raw in ("1", "true", "yes", "on")
def _is_local(host: str | None) -> bool:
if not host:
return False
h = host.lower()
return h in ("127.0.0.1", "::1", "localhost") or h.startswith("::ffff:127.0.0.1")
def _ipv4_rfc1918_private(host: str) -> bool:
h = host.lower()
if h.startswith("::ffff:"):
h = h[7:]
parts = h.split(".")
if len(parts) != 4:
return False
try:
a, b, c, d = (int(x) for x in parts)
except ValueError:
return False
if any(x < 0 or x > 255 for x in (a, b, c, d)):
return False
if a == 10:
return True
if a == 172 and 16 <= b <= 31:
return True
if a == 192 and b == 168:
return True
return False
def _client_allowed(host: str | None) -> bool:
if _is_local(host):
return True
if HUB_TRUST_LAN and host and _ipv4_rfc1918_private(host):
return True
return False
def _hub_headers() -> dict[str, str]:
if not HUB_BRIDGE_TOKEN:
return {}
return {"X-Hub-Token": HUB_BRIDGE_TOKEN}
def _agent_headers() -> dict[str, str]:
if not HUB_BRIDGE_TOKEN:
return {}
return {"X-Control-Token": HUB_BRIDGE_TOKEN}
def _find_exchange(ex_id: str) -> dict | None:
for ex in load_settings().get("exchanges") or []:
if str(ex.get("id")) == str(ex_id):
return ex
return None
async def _run_chart_poll() -> dict:
keys = chart_poll_store.active_series_keys()
if not keys:
return {"ok": True, "series_count": 0, "polled": 0}
polled = 0
errors: list[str] = []
for key in keys:
parsed = parse_series_key(key)
if not parsed:
continue
ex_k, sym, tf = parsed
ex = _find_exchange_by_key(ex_k)
if not ex or not ex.get("enabled"):
continue
ex_ref = ex
sym_ref = sym
tf_ref = tf
def remote_fetch(**kwargs) -> dict:
tf_use = kwargs.get("timeframe") or tf_ref
return _fetch_instance_ohlcv_sync(
ex_ref,
symbol=kwargs.get("symbol") or sym_ref,
timeframe=tf_use,
since_ms=kwargs.get("since_ms"),
limit=int(kwargs.get("limit") or bar_limit_for_timeframe(tf_use)),
)
try:
result = await asyncio.to_thread(
resolve_chart_bars,
ex_k,
sym,
tf,
remote_fetch,
force_refresh=False,
tail_refresh=True,
)
polled += 1
chart_poll_store.note_series_result(
ex_k,
sym,
tf,
ok=bool(result.get("ok")),
fetched=int(result.get("fetched") or 0),
error=None if result.get("ok") else str(result.get("msg") or "poll_failed"),
candles=result.get("candles") if result.get("ok") else None,
price_tick=result.get("price_tick"),
)
if not result.get("ok"):
errors.append(f"{key}:{result.get('msg')}")
except Exception as e:
chart_poll_store.note_series_result(ex_k, sym, tf, ok=False, error=str(e))
errors.append(f"{key}:{e}")
out: dict = {"ok": True, "series_count": len(keys), "polled": polled}
if errors:
out["errors"] = errors[:8]
return out
async def _run_board_aggregate() -> dict:
try:
body = await asyncio.wait_for(_build_monitor_board_payload(), timeout=HUB_BOARD_TIMEOUT)
try:
from hub_fund_history_lib import record_fund_snapshot_from_board
await asyncio.to_thread(record_fund_snapshot_from_board, body.get("rows") or [])
except Exception:
pass
return {"ok": True, **body}
except asyncio.TimeoutError:
return {
"ok": False,
"rows": [],
"error": "board_timeout",
"msg": (
f"监控聚合超过 {int(HUB_BOARD_TIMEOUT)} 秒。"
"请检查子代理/Flask,或设 HUB_BOARD_KEY_PRICES=false、缩短 HUB_FLASK_TIMEOUT"
),
"updated_at": __import__("datetime").datetime.now().isoformat(timespec="seconds"),
}
def _schedule_board_refresh() -> None:
board_store.request_refresh()
dashboard_store.request_refresh()
async def _run_archive_sync_once() -> dict:
global _last_archive_sync
init_archive_db()
settings = load_settings()
targets = enabled_exchanges(settings)
results: list[dict] = []
for ex in targets:
ex_key = str(ex.get("key") or "").strip().lower()
if not ex_key:
continue
trades_resp = await asyncio.to_thread(
_fetch_instance_trades_archive_sync,
ex,
days=ARCHIVE_TRADE_DAYS,
limit=ARCHIVE_TRADE_LIMIT,
)
if not trades_resp.get("ok"):
st = trades_resp.get("status")
msg = (
trades_resp.get("msg")
or trades_resp.get("error")
or trades_resp.get("detail")
or "拉取交易失败"
)
if st == 404:
msg = (
"HTTP 404:该 Flask 未注册 /api/hub/trades/archive。"
"请在仓库根目录 git pull 后 pm2 restart crypto_gate crypto_gate_bot"
)
results.append(
{
"exchange_key": ex_key,
"name": ex.get("name"),
"ok": False,
"status": st,
"msg": msg,
}
)
continue
trades = trades_resp.get("trades") or []
for t in trades:
if isinstance(t, dict):
t["exchange_key"] = ex_key
def remote_fetch(**kwargs):
return _fetch_instance_ohlcv_sync(
ex,
symbol=kwargs.get("symbol") or "",
timeframe=kwargs.get("timeframe") or "5m",
since_ms=kwargs.get("since_ms"),
limit=int(kwargs.get("limit") or 500),
)
r = await asyncio.to_thread(
sync_exchange_symbol_archives,
ex_key,
trades,
remote_fetch,
)
r["name"] = ex.get("name")
r["trade_count"] = len(trades)
results.append(r)
out = {
"ok": True,
"exchanges": len(targets),
"results": results,
"updated_at": __import__("datetime").datetime.now().isoformat(timespec="seconds"),
}
_last_archive_sync = out
return out
def _fetch_instance_volume_rank_sync(ex: dict, *, top_n: int = TOP_N_DEFAULT) -> dict:
base = (ex.get("flask_url") or "").rstrip("/")
if not base:
return {"ok": False, "msg": "未配置 flask_url"}
params = {"top": str(int(top_n))}
url = f"{base}/api/hub/volume-rank?{urlencode(params)}"
try:
with httpx.Client(timeout=max(HUB_FLASK_TIMEOUT, 120.0)) as client:
r = client.get(url, headers=_hub_headers())
if r.status_code >= 400:
parsed = _parse_http_json_body(r)
parsed.setdefault("ok", False)
parsed.setdefault("status", r.status_code)
return parsed
data = r.json() if r.content else {}
return data if isinstance(data, dict) else {"ok": False, "msg": "无效 JSON"}
except Exception as e:
return {"ok": False, "msg": str(e)}
def _get_volume_rank_cache() -> dict:
global _volume_rank_cache
if _volume_rank_cache is None:
_volume_rank_cache = load_volume_rank_cache()
return _volume_rank_cache
def _refresh_volume_ranks(*, force: bool = False) -> dict:
global _volume_rank_cache
expected = rank_date_label()
cache = _get_volume_rank_cache()
targets = enabled_exchanges(load_settings())
required_keys = [
str(ex.get("key") or "").strip().lower()
for ex in targets
if ex.get("enabled") and str(ex.get("key") or "").strip()
]
if not force and not cache_needs_refresh(
cache, expected_rank_date=expected, required_keys=required_keys
):
return {
"ok": True,
"skipped": True,
"rank_date": cache.get("rank_date"),
"updated_at": cache.get("updated_at"),
}
errors: list[str] = []
for ex in targets:
ex_key = str(ex.get("key") or "").strip().lower()
if not ex_key or not ex.get("enabled"):
continue
resp = _fetch_instance_volume_rank_sync(ex, top_n=TOP_N_DEFAULT)
if resp.get("ok") and resp.get("items"):
cache = merge_exchange_rank(cache, ex_key, resp)
else:
msg = str(resp.get("msg") or resp.get("error") or "拉取失败")
if resp.get("ok") and not resp.get("items"):
msg = msg if msg != "拉取失败" else "无有效成交额数据"
errors.append(f"{ex_key}:{msg}")
exchanges = dict(cache.get("exchanges") or {})
prev = dict(exchanges.get(ex_key) or {})
prev["error"] = msg
if not prev.get("items"):
prev["items"] = []
exchanges[ex_key] = prev
cache["exchanges"] = exchanges
cache["rank_date"] = expected
save_volume_rank_cache(cache)
_volume_rank_cache = cache
out: dict = {
"ok": True,
"rank_date": expected,
"exchanges": len(targets),
"updated_at": cache.get("updated_at"),
}
if errors:
out["errors"] = errors[:8]
return out
async def _volume_rank_loop() -> None:
global _volume_rank_stop
stop = _volume_rank_stop
if stop is None:
return
try:
await asyncio.to_thread(_refresh_volume_ranks, force=False)
except Exception:
pass
while not stop.is_set():
try:
wait_sec = seconds_until_next_reset()
await asyncio.wait_for(stop.wait(), timeout=wait_sec)
break
except asyncio.TimeoutError:
pass
if stop.is_set():
break
try:
await asyncio.to_thread(_refresh_volume_ranks, force=True)
except Exception:
pass
async def _archive_sync_loop() -> None:
global _archive_sync_stop
stop = _archive_sync_stop
if stop is None:
return
init_archive_db()
while not stop.is_set():
try:
await _run_archive_sync_once()
except Exception:
pass
try:
await asyncio.wait_for(stop.wait(), timeout=float(ARCHIVE_SYNC_INTERVAL_SEC))
except asyncio.TimeoutError:
pass
@asynccontextmanager
async def _hub_lifespan(_app: FastAPI):
global _archive_sync_stop, _archive_sync_task, _volume_rank_stop, _volume_rank_task
await board_store.start(_run_board_aggregate)
await dashboard_store.start(_run_dashboard_aggregate)
await chart_poll_store.start(_run_chart_poll)
_archive_sync_stop = asyncio.Event()
_archive_sync_task = asyncio.create_task(_archive_sync_loop(), name="hub-archive-sync")
_volume_rank_stop = asyncio.Event()
_volume_rank_task = asyncio.create_task(_volume_rank_loop(), name="hub-volume-rank")
try:
yield
finally:
if _archive_sync_stop:
_archive_sync_stop.set()
if _archive_sync_task:
_archive_sync_task.cancel()
try:
await _archive_sync_task
except asyncio.CancelledError:
pass
_archive_sync_task = None
_archive_sync_stop = None
if _volume_rank_stop:
_volume_rank_stop.set()
if _volume_rank_task:
_volume_rank_task.cancel()
try:
await _volume_rank_task
except asyncio.CancelledError:
pass
_volume_rank_task = None
_volume_rank_stop = None
await chart_poll_store.stop()
await dashboard_store.stop()
await board_store.stop()
app = FastAPI(title="复盘系统中控", docs_url=None, redoc_url=None, lifespan=_hub_lifespan)
STATIC_DIR = DIR / "static"
_REPO_STATIC = _REPO_ROOT / "static"
_AI_REVIEW_RENDER_JS = _REPO_STATIC / "ai_review_render.js"
_ACCOUNT_RISK_BADGE_CSS = _REPO_STATIC / "account_risk_badge.css"
_ACCOUNT_RISK_BADGE_JS = _REPO_STATIC / "account_risk_badge.js"
@app.get("/assets/account_risk_badge.css")
def hub_account_risk_badge_css():
"""与四所实例共用仓库根 static/account_risk_badge.css。"""
if not _ACCOUNT_RISK_BADGE_CSS.is_file():
raise HTTPException(status_code=404, detail="account_risk_badge.css not found")
return FileResponse(
str(_ACCOUNT_RISK_BADGE_CSS),
media_type="text/css; charset=utf-8",
)
@app.get("/assets/account_risk_badge.js")
def hub_account_risk_badge_js():
"""与四所实例共用仓库根 static/account_risk_badge.js。"""
if not _ACCOUNT_RISK_BADGE_JS.is_file():
raise HTTPException(status_code=404, detail="account_risk_badge.js not found")
return FileResponse(
str(_ACCOUNT_RISK_BADGE_JS),
media_type="application/javascript; charset=utf-8",
)
@app.get("/assets/ai_review_render.js")
def hub_ai_review_render_js():
"""与四所实例共用仓库根 static/ai_review_render.js(须在 /assets mount 之前注册)。"""
if not _AI_REVIEW_RENDER_JS.is_file():
raise HTTPException(status_code=404, detail="ai_review_render.js not found")
return FileResponse(
str(_AI_REVIEW_RENDER_JS),
media_type="application/javascript; charset=utf-8",
)
if STATIC_DIR.is_dir():
app.mount("/assets", StaticFiles(directory=str(STATIC_DIR)), name="assets")
@app.middleware("http")
async def local_only(request: Request, call_next):
if HUB_ALLOW_PUBLIC:
return await call_next(request)
peer = request.client.host if request.client else None
if not _client_allowed(peer):
return JSONResponse({"detail": "forbidden"}, status_code=403)
return await call_next(request)
@app.middleware("http")
async def embed_frame_headers(request: Request, call_next):
response = await call_next(request)
if embed_allowed():
ancestors = embed_frame_ancestors()
if ancestors == "*":
response.headers["Content-Security-Policy"] = "frame-ancestors *"
else:
response.headers["Content-Security-Policy"] = f"frame-ancestors 'self' {ancestors}"
return response
@app.middleware("http")
async def hub_password_gate(request: Request, call_next):
if not password_required():
return await call_next(request)
path = request.url.path
if is_public_path(path, request.method):
return await call_next(request)
token = request.cookies.get(SESSION_COOKIE)
if validate_session_token(token):
return await call_next(request)
if path.startswith("/api/"):
return JSONResponse({"detail": "未登录", "login_required": True}, status_code=401)
from fastapi.responses import RedirectResponse
nxt = path if path.startswith("/") else "/monitor"
return RedirectResponse(f"/login?next={nxt}", status_code=302)
def _shell_page():
index = STATIC_DIR / "index.html"
if not index.is_file():
return JSONResponse({"detail": "missing static/index.html"}, status_code=500)
return FileResponse(index)
def _login_page():
login = STATIC_DIR / "login.html"
if not login.is_file():
return JSONResponse({"detail": "missing static/login.html"}, status_code=500)
return FileResponse(login)
class LoginBody(BaseModel):
username: str = ""
password: str = ""
@app.get("/api/auth/status")
def api_auth_status(request: Request):
required = password_required()
logged_in = not required or validate_session_token(request.cookies.get(SESSION_COOKIE))
return {
"required": required,
"logged_in": logged_in,
}
@app.post("/api/auth/login")
def api_auth_login(body: LoginBody, request: Request):
if not password_required():
return {"ok": True, "auth_disabled": True}
if not verify_credentials(body.username, body.password):
raise HTTPException(status_code=401, detail="用户名或密码错误")
token = create_session_token(body.username)
embed = (request.headers.get("x-hub-embed") or "").strip() == "1"
resp = JSONResponse({"ok": True, "session_token": token, "embed": embed})
set_session_cookie(resp, request, token, embed=embed)
return resp
@app.get("/embed-auth")
def embed_auth_login(request: Request, token: str = "", next: str = "/monitor"):
"""
嵌入式打开:父页跨域 fetch 登录时 Cookie 可能写不进 iframe
用 session_token 在本页做一次导航,在 iframe 内写入 hub_sess。
"""
from fastapi.responses import RedirectResponse
dest = safe_next_path(next)
if not password_required():
return RedirectResponse(dest, status_code=302)
if not validate_session_token(token):
q = urlencode({"next": dest, "embed": "1"})
return RedirectResponse(f"/login?{q}", status_code=302)
resp = RedirectResponse(dest, status_code=302)
set_session_cookie(resp, request, token, embed=True)
return resp
@app.post("/api/auth/logout")
def api_auth_logout(request: Request):
embed = (request.headers.get("x-hub-embed") or "").strip() == "1"
resp = JSONResponse({"ok": True})
clear_session_cookie(resp, request, embed=embed)
return resp
@app.get("/login")
def login_page():
return _login_page()
@app.get("/")
def root_redirect():
from fastapi.responses import RedirectResponse
return RedirectResponse("/monitor")
@app.get("/monitor")
@app.get("/plan")
@app.get("/market")
@app.get("/archive")
@app.get("/dashboard")
@app.get("/funds")
@app.get("/ai")
@app.get("/settings")
def shell_pages():
return _shell_page()
def _all_exchanges_for_ai() -> list:
"""AI 聚合用:含未启用账户(标记未监控)。"""
return list(load_settings().get("exchanges") or [])
from hub_ai.routes import create_hub_ai_router
from hub_dashboard import build_dashboard_payload, default_trading_day
app.include_router(create_hub_ai_router(load_all_exchanges=_all_exchanges_for_ai))
async def _run_dashboard_aggregate() -> dict:
try:
return await asyncio.to_thread(
build_dashboard_payload,
_all_exchanges_for_ai(),
trading_day=default_trading_day(),
)
except Exception as exc:
return {"ok": False, "msg": str(exc), "error": "aggregate_failed"}
def _schedule_dashboard_refresh() -> None:
dashboard_store.request_refresh()
@app.get("/api/dashboard/daily")
def api_dashboard_daily(trading_day: str = ""):
day = (trading_day or "").strip()[:10] or default_trading_day()
if not (trading_day or "").strip():
return dashboard_store.snapshot_dict()
try:
payload = build_dashboard_payload(
_all_exchanges_for_ai(),
trading_day=day,
)
except Exception as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
return {**payload, "dashboard_version": dashboard_store.version}
@app.get("/api/dashboard/stream")
async def api_dashboard_stream():
from fastapi.responses import StreamingResponse
return StreamingResponse(
dashboard_store.iter_sse(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
@app.post("/api/dashboard/refresh")
async def api_dashboard_refresh():
_schedule_dashboard_refresh()
return {"ok": True, "dashboard_version": dashboard_store.version}
@app.get("/trade")
def trade_removed_redirect():
from fastapi.responses import RedirectResponse
return RedirectResponse("/monitor", status_code=302)
@app.get("/api/settings")
def api_get_settings():
return load_settings()
class SettingsDisplayBody(BaseModel):
show_account_pnl: bool = True
show_nav_funds: bool = True
show_nav_dashboard: bool = True
class SettingsBody(BaseModel):
exchanges: list[dict] = Field(default_factory=list)
display: SettingsDisplayBody | None = None
@app.post("/api/settings")
def api_save_settings(body: SettingsBody):
force_off = env_force_disabled_ids()
to_save = []
for ex in body.exchanges:
row = dict(ex)
eid = str(row.get("id", "")).strip()
if eid in force_off:
row["enabled"] = False
row.pop("env_disabled", None)
to_save.append(row)
existing = load_settings()
display = normalize_display_prefs(existing.get("display"))
if body.display is not None:
display = normalize_display_prefs(body.display.model_dump())
save_settings({"version": 1, "exchanges": to_save, "display": display})
return {"ok": True, "settings": load_settings()}
def _find_exchange_by_key(exchange_key: str) -> dict | None:
key = (exchange_key or "").strip().lower()
if not key:
return None
for ex in load_settings().get("exchanges") or []:
if str(ex.get("key") or "").strip().lower() == key:
return ex
if str(ex.get("id") or "").strip() == exchange_key.strip():
return ex
return None
def _fetch_instance_trades_archive_sync(
ex: dict,
*,
days: int = 365,
limit: int = 2000,
) -> dict:
base = (ex.get("flask_url") or "").rstrip("/")
if not base:
return {"ok": False, "msg": "未配置 flask_url"}
params = {"days": str(int(days)), "limit": str(int(limit))}
url = f"{base}/api/hub/trades/archive?{urlencode(params)}"
try:
with httpx.Client(timeout=HUB_FLASK_TIMEOUT) as client:
r = client.get(url, headers=_hub_headers())
if r.status_code >= 400:
parsed = _parse_http_json_body(r)
parsed.setdefault("ok", False)
parsed.setdefault("status", r.status_code)
return parsed
data = r.json() if r.content else {}
if isinstance(data, dict):
data.setdefault("ok", True)
return data
return {"ok": False, "msg": "无效 JSON"}
except Exception as e:
return {"ok": False, "msg": str(e)}
def _fetch_instance_ohlcv_sync(
ex: dict,
*,
symbol: str,
timeframe: str,
since_ms: int | None,
limit: int,
) -> dict:
base = (ex.get("flask_url") or "").rstrip("/")
if not base:
return {"ok": False, "msg": "未配置 flask_url"}
params = {"symbol": symbol, "timeframe": timeframe, "limit": str(int(limit))}
if since_ms is not None and int(since_ms) > 0:
params["since_ms"] = str(int(since_ms))
url = f"{base}/api/hub/ohlcv?{urlencode(params)}"
try:
with httpx.Client(timeout=HUB_FLASK_TIMEOUT) as client:
r = client.get(url, headers=_hub_headers())
if r.status_code >= 400:
parsed = _parse_http_json_body(r)
parsed.setdefault("ok", False)
return parsed
data = r.json() if r.content else {}
return data if isinstance(data, dict) else {"ok": False, "msg": "无效 JSON"}
except Exception as e:
return {"ok": False, "msg": str(e)}
@app.get("/api/chart/meta")
def api_chart_meta():
tfs = [tf for tf in CHART_TIMEFRAME_ORDER if tf in CHART_TIMEFRAMES]
exchanges = []
for ex in enabled_exchanges(load_settings()):
exchanges.append(
{
"id": ex.get("id"),
"key": ex.get("key"),
"name": ex.get("name"),
}
)
return {
"ok": True,
"timeframes": [tf for tf in tfs if tf in CHART_TIMEFRAMES],
"retention_days": retention_days(),
"retention_policy": retention_policy_meta(),
"limits": {tf: bar_limit_for_timeframe(tf) for tf in tfs if tf in CHART_TIMEFRAMES},
"initial_limits": {tf: chart_initial_limit(tf) for tf in tfs if tf in CHART_TIMEFRAMES},
"chunk_limits": {tf: chart_chunk_limit(tf) for tf in tfs if tf in CHART_TIMEFRAMES},
"memory_caps": {tf: chart_memory_cap(tf) for tf in tfs if tf in CHART_TIMEFRAMES},
"exchanges": exchanges,
"volume_rank_top_n": TOP_N_DEFAULT,
"volume_rank_reset_hour": volume_rank_reset_hour(),
}
@app.get("/api/chart/volume-rank")
def api_chart_volume_rank(exchange_key: str = "", refresh: str = ""):
force = (refresh or "").strip().lower() in ("1", "true", "yes", "on")
if force:
result = _refresh_volume_ranks(force=True)
if not result.get("ok"):
raise HTTPException(status_code=502, detail=result.get("msg") or "刷新失败")
cache = _get_volume_rank_cache()
ex_k = (exchange_key or "").strip().lower()
targets = enabled_exchanges(load_settings())
required_keys = [
str(ex.get("key") or "").strip().lower()
for ex in targets
if ex.get("enabled") and str(ex.get("key") or "").strip()
]
need_keys = [ex_k] if ex_k else required_keys
if cache_needs_refresh(cache, required_keys=need_keys):
_refresh_volume_ranks(force=True)
cache = _get_volume_rank_cache()
elif ex_k:
row = (cache.get("exchanges") or {}).get(ex_k) or {}
if _exchange_rank_row_stale(row):
_refresh_volume_ranks(force=True)
cache = _get_volume_rank_cache()
if ex_k:
ex = _find_exchange_by_key(ex_k)
if not ex:
raise HTTPException(status_code=400, detail="交易所不存在")
payload = get_cached_rank(cache, ex_k, top_n=TOP_N_DEFAULT)
payload["items"] = [
{**row, "volume_label": format_volume_quote(row.get("volume_quote"))}
for row in payload.get("items") or []
]
payload["reset_hour"] = volume_rank_reset_hour()
err = ((cache.get("exchanges") or {}).get(ex_k) or {}).get("error")
if err and not payload.get("items"):
payload["ok"] = False
payload["msg"] = err
return payload
exchanges_out = {}
for ex in enabled_exchanges(load_settings()):
key = str(ex.get("key") or "").strip().lower()
if not key:
continue
row = get_cached_rank(cache, key, top_n=TOP_N_DEFAULT)
row["name"] = ex.get("name")
row["items"] = [
{**item, "volume_label": format_volume_quote(item.get("volume_quote"))}
for item in row.get("items") or []
]
exchanges_out[key] = row
return {
"ok": True,
"rank_date": cache.get("rank_date"),
"updated_at": cache.get("updated_at"),
"reset_hour": volume_rank_reset_hour(),
"exchanges": exchanges_out,
}
@app.post("/api/chart/volume-rank/refresh")
async def api_chart_volume_rank_refresh():
result = await asyncio.to_thread(_refresh_volume_ranks, force=True)
if not result.get("ok"):
raise HTTPException(status_code=502, detail=result.get("msg") or "刷新失败")
return result
@app.get("/api/chart/ohlcv")
def api_chart_ohlcv(
exchange_key: str = "",
symbol: str = "",
timeframe: str = "1d",
refresh: str = "",
tail: str = "",
limit: int = 0,
before_ms: str = "",
):
ex = _find_exchange_by_key(exchange_key)
if not ex:
raise HTTPException(status_code=400, detail="交易所不存在")
if not ex.get("enabled"):
raise HTTPException(status_code=400, detail="该交易所未启用")
sym = (symbol or "").strip().upper()
if not sym:
raise HTTPException(status_code=400, detail="请输入币种")
ex_key = str(ex.get("key") or "").strip().lower()
force = (refresh or "").strip().lower() in ("1", "true", "yes", "on")
tail_refresh = (tail or "").strip().lower() in ("1", "true", "yes", "on")
lim = int(limit) if int(limit or 0) > 0 else None
bms_raw = (before_ms or "").strip()
bms = None
if bms_raw:
try:
bms = int(bms_raw)
except ValueError:
raise HTTPException(status_code=400, detail="before_ms 无效")
clear_db = force and not tail_refresh and bms is None
def remote_fetch(**kwargs):
tf_use = kwargs.get("timeframe") or timeframe
return _fetch_instance_ohlcv_sync(
ex,
symbol=kwargs.get("symbol") or sym,
timeframe=tf_use,
since_ms=kwargs.get("since_ms"),
limit=int(kwargs.get("limit") or bar_limit_for_timeframe(tf_use)),
)
result = resolve_chart_bars(
ex_key,
sym,
timeframe,
remote_fetch,
force_refresh=force,
tail_refresh=tail_refresh,
clear_db=clear_db,
limit=lim,
before_ms=bms,
)
if not result.get("ok"):
raise HTTPException(status_code=502, detail=result.get("msg") or "K线加载失败")
if not result.get("candles") and result.get("before_ms") is None:
raise HTTPException(status_code=502, detail=result.get("msg") or "无 K 线")
tick = result.get("price_tick")
last = result["candles"][-1] if result.get("candles") else None
result["ohlcv"] = format_ohlcv_detail(
{
"open": last.get("open") if last else None,
"high": last.get("high") if last else None,
"low": last.get("low") if last else None,
"close": last.get("close") if last else None,
"volume": last.get("volume") if last else None,
}
if last
else None,
tick,
)
result["chart_version"] = chart_poll_store.version
result["series_version"] = chart_poll_store.series_version(ex_key, sym, timeframe)
result["chart_poll_interval_sec"] = HUB_CHART_POLL_INTERVAL
return result
class ChartWatchBody(BaseModel):
exchange_key: str = ""
symbol: str = ""
timeframe: str = "5m"
@app.post("/api/chart/watch")
async def api_chart_watch(body: ChartWatchBody = Body(...)):
ex_k = (body.exchange_key or "").strip().lower()
sym = (body.symbol or "").strip().upper()
tf = (body.timeframe or "5m").strip()
if not ex_k or not sym:
raise HTTPException(status_code=400, detail="缺少 exchange_key 或 symbol")
if tf not in CHART_TIMEFRAMES:
raise HTTPException(status_code=400, detail="不支持的周期")
key = chart_poll_store.touch_watch(ex_k, sym, tf)
chart_poll_store.request_refresh()
return {
"ok": True,
"series_key": key,
"series_version": chart_poll_store.series_version(ex_k, sym, tf),
"chart_version": chart_poll_store.version,
"watch_ttl_sec": HUB_CHART_WATCH_TTL_SEC,
}
@app.post("/api/chart/unwatch")
async def api_chart_unwatch(body: ChartWatchBody = Body(...)):
chart_poll_store.clear_watch(body.exchange_key, body.symbol, body.timeframe)
return {"ok": True}
@app.get("/api/chart/stream")
async def api_chart_stream():
from fastapi.responses import StreamingResponse
return StreamingResponse(
chart_poll_store.iter_sse(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
@app.get("/api/chart/poll/meta")
async def api_chart_poll_meta():
return chart_poll_store.event_dict()
@app.get("/api/settings/meta")
def api_settings_meta():
po = public_origin()
return {
"env_disabled_ids": sorted(env_force_disabled_ids()),
"hub_bridge_token_set": bool(HUB_BRIDGE_TOKEN),
"capability_options": ["key", "trend"],
"public_origin": f"{po[0]}://{po[1]}" if po else None,
"public_origin_hint": (
"未设置 HUB_PUBLIC_ORIGIN 时,复盘链接若为 127.0.0.1,仅服务器本机浏览器可打开"
if not po
else "复盘/展示链接已替换为对外地址"
),
"password_required": password_required(),
}
async def _fetch_agent_status(client: httpx.AsyncClient, ex: dict) -> dict:
url = f"{ex['agent_url'].rstrip('/')}/status"
try:
r = await client.get(url, headers=_agent_headers(), timeout=HUB_AGENT_TIMEOUT)
body = r.json() if r.content else {}
return {
"id": ex["id"],
"name": ex["name"],
"key": ex.get("key"),
"agent_url": ex["agent_url"],
"flask_url": ex.get("flask_url"),
"capabilities": ex.get("capabilities") or [],
"http_ok": r.status_code == 200,
"agent": body,
"error": body.get("error") if isinstance(body, dict) else None,
}
except Exception as e:
return {
"id": ex["id"],
"name": ex["name"],
"key": ex.get("key"),
"agent_url": ex["agent_url"],
"flask_url": ex.get("flask_url"),
"capabilities": ex.get("capabilities") or [],
"http_ok": False,
"error": str(e),
"agent": None,
}
def _parse_http_json_body(r: httpx.Response) -> dict:
text = (r.text or "").strip()
if not text:
return {"ok": False, "status": r.status_code, "text": "(empty body)"}
try:
data = r.json()
if isinstance(data, dict):
return data
return {"ok": False, "status": r.status_code, "text": text[:500]}
except Exception:
snippet = text[:500]
if snippet.lstrip().lower().startswith("<!") or "internal server error" in snippet.lower():
return {
"ok": False,
"status": r.status_code,
"messages": [f"实例返回 HTML 错误(HTTP {r.status_code}),请查看该 Flask 日志"],
"text": snippet,
}
return {"ok": False, "status": r.status_code, "messages": [snippet], "text": snippet}
async def _fetch_flask_json(
client: httpx.AsyncClient,
ex: dict,
path: str,
method: str = "GET",
data=None,
json_body: dict | None = None,
) -> dict | None:
base = (ex.get("flask_url") or "").rstrip("/")
if not base:
return None
try:
if method == "GET":
r = await client.get(f"{base}{path}", headers=_hub_headers(), timeout=HUB_FLASK_TIMEOUT)
else:
headers = {**_hub_headers(), "Content-Type": "application/json"}
if json_body is not None:
r = await client.post(
f"{base}{path}", headers=headers, json=json_body, timeout=120.0
)
else:
r = await client.post(
f"{base}{path}", headers=headers, data=data, timeout=120.0
)
if r.status_code >= 400:
parsed = _parse_http_json_body(r)
parsed.setdefault("ok", False)
parsed.setdefault("status", r.status_code)
return parsed
return _parse_http_json_body(r)
except Exception as e:
return {"ok": False, "error": str(e)}
async def _notify_instance_user_close(
client: httpx.AsyncClient, ex: dict, *, count: int = 1
) -> dict | None:
"""登记实例侧用户主动平仓风控(中控点平仓/全平)。"""
if count <= 0 or not (ex.get("flask_url") or "").strip():
return None
return await _fetch_flask_json(
client,
ex,
"/api/hub/account-risk/user-close",
method="POST",
json_body={"source": "user_hub", "count": int(count)},
)
def _flask_error_from_hub_mon(hub_mon: dict | None) -> str | None:
if not isinstance(hub_mon, dict) or hub_mon.get("ok") is not False:
return None
st = hub_mon.get("status")
if st == 404:
return (
"HTTP 404:该 Flask 未注册 /api/hub/*hub_bridge 未加载)。"
"请在仓库根目录 git pull 后 pm2 restart crypto_binance crypto_gate crypto_gate_bot"
"并查看启动日志是否含 [hub_bridge] ImportError"
)
return (
hub_mon.get("msg")
or hub_mon.get("error")
or (f"HTTP {st}" if st else None)
or (str(hub_mon.get("text") or "")[:120] or None)
)
def _cond_order_trigger_key(price: object) -> str | None:
if price is None or price == "":
return None
try:
return f"{float(price):.12g}"
except (TypeError, ValueError):
return None
def _merge_conditional_orders_no_dup(
existing: list, extra: list
) -> list:
"""子代理已拉到的条件单与 Flask exchange_tpsl 合成行按触发价/订单号去重,避免 Gate 显示 4 笔实为 2 笔。"""
if not extra:
return list(existing) if existing else []
if not existing:
return list(extra)
triggers: set[str] = set()
order_ids: set[str] = set()
out: list = []
for row in existing:
if not isinstance(row, dict):
continue
out.append(row)
k = _cond_order_trigger_key(row.get("trigger_price"))
if k:
triggers.add(k)
oid = row.get("id")
if oid not in (None, ""):
order_ids.add(str(oid))
for row in extra:
if not isinstance(row, dict):
continue
k = _cond_order_trigger_key(row.get("trigger_price"))
oid = row.get("id")
if k and k in triggers:
continue
if oid not in (None, "") and str(oid) in order_ids:
continue
out.append(row)
if k:
triggers.add(k)
if oid not in (None, ""):
order_ids.add(str(oid))
return out
def _tpsl_slots_to_conditional_orders(exchange_tpsl: dict, symbol: str) -> list[dict]:
"""将实例 price_snapshot 的 exchange_tpsl 转为中控条件单结构。"""
out: list[dict] = []
if not isinstance(exchange_tpsl, dict):
return out
for role, label in (("sl", "止损"), ("tp", "止盈")):
slot = exchange_tpsl.get(role)
if not isinstance(slot, dict):
continue
trig = slot.get("trigger_price")
if trig is None:
continue
try:
trig_f = float(trig)
except (TypeError, ValueError):
continue
oid = slot.get("order_id")
out.append(
{
"id": str(oid) if oid is not None else "",
"symbol": symbol,
"channel": "algo",
"category": "conditional",
"label": f"{label} {trig_f:g}",
"trigger_price": trig_f,
"amount": slot.get("amount"),
"status": "open",
}
)
return out
def _exchange_tpsl_from_hub_order(hub_orders: list, symbol: str, side: str) -> dict | None:
"""趋势保本移交后:用下单监控计划价补全 exchange_tpsl(与实例页一致)。"""
side_l = (side or "").lower()
for o in hub_orders:
if not isinstance(o, dict):
continue
o_sym = o.get("exchange_symbol") or o.get("symbol") or ""
if not _symbols_match(symbol, o_sym):
continue
if (o.get("direction") or "").lower() != side_l:
continue
sl = o.get("stop_loss")
tp = o.get("take_profit")
if sl in (None, "") and tp in (None, ""):
continue
slots: dict = {"sl": None, "tp": None}
if sl not in (None, ""):
try:
slots["sl"] = {"trigger_price": float(sl), "order_id": None}
except (TypeError, ValueError):
pass
if tp not in (None, ""):
try:
slots["tp"] = {"trigger_price": float(tp), "order_id": None}
except (TypeError, ValueError):
pass
if slots["sl"] or slots["tp"]:
return slots
return None
def _find_exchange_tpsl_for_position(
symbol: str,
side: str,
order_prices: list,
hub_orders: list,
) -> dict | None:
side_l = (side or "").lower()
op_by_id = {
op.get("id"): op
for op in order_prices
if isinstance(op, dict) and op.get("id") is not None
}
for o in hub_orders:
if not isinstance(o, dict):
continue
o_sym = o.get("exchange_symbol") or o.get("symbol") or ""
if not _symbols_match(symbol, o_sym):
continue
if (o.get("direction") or "").lower() != side_l:
continue
op = op_by_id.get(o.get("id"))
if not isinstance(op, dict):
continue
et = op.get("exchange_tpsl")
if isinstance(et, dict) and (et.get("sl") or et.get("tp")):
return et
for op in order_prices:
if not isinstance(op, dict):
continue
if not _symbols_match(symbol, op.get("symbol") or ""):
continue
et = op.get("exchange_tpsl")
if isinstance(et, dict) and (et.get("sl") or et.get("tp")):
return et
return None
def _merge_flask_order_price_fields(hub_mon: dict | None, snap: dict | None) -> None:
"""将 price_snapshot 中的快照盈亏比、已保本状态合并进 hub_monitor.orders。"""
if not isinstance(hub_mon, dict) or not isinstance(snap, dict):
return
order_prices = snap.get("order_prices") or []
op_by_id = {
op.get("id"): op
for op in order_prices
if isinstance(op, dict) and op.get("id") is not None
}
orders = hub_mon.get("orders") or []
if not isinstance(orders, list):
return
for o in orders:
if not isinstance(o, dict):
continue
op = op_by_id.get(o.get("id"))
if not isinstance(op, dict):
continue
if op.get("rr_ratio") is not None:
o["rr_ratio"] = op["rr_ratio"]
if "sl_breakeven_secured" in op:
o["sl_breakeven_secured"] = bool(op["sl_breakeven_secured"])
for key in (
"stop_loss",
"take_profit",
"stop_loss_display",
"take_profit_display",
"display_rr_ratio",
"exchange_initial_margin",
"plan_margin",
"time_close_enabled",
"time_close_hours",
"time_close_at_ms",
"time_close_label",
"time_close_countdown",
"time_close_remaining_sec",
):
if key in op and op[key] not in (None, ""):
o[key] = op[key]
def _merge_flask_position_breakeven(agent_row: dict, snap: dict | None, hub_mon: dict | None) -> None:
"""将 price_snapshot 的已保本状态同步到 agent 持仓,供中控首页表格展示。"""
ag = agent_row.get("agent")
if not isinstance(ag, dict) or not isinstance(snap, dict):
return
positions = ag.get("positions")
if not isinstance(positions, list) or not positions:
return
order_prices = snap.get("order_prices") or []
hub_orders = []
if isinstance(hub_mon, dict):
hub_orders = hub_mon.get("orders") or []
op_by_id = {
op.get("id"): op
for op in order_prices
if isinstance(op, dict) and op.get("id") is not None
}
for p in positions:
if not isinstance(p, dict):
continue
sym = p.get("symbol") or ""
side = (p.get("side") or "").lower()
matched = None
for o in hub_orders:
if not isinstance(o, dict):
continue
o_sym = o.get("exchange_symbol") or o.get("symbol") or ""
if not _symbols_match(sym, o_sym):
continue
if (o.get("direction") or "").lower() != side:
continue
matched = op_by_id.get(o.get("id"))
break
if matched is None:
for op in order_prices:
if not isinstance(op, dict):
continue
if not _symbols_match(sym, op.get("symbol") or ""):
continue
matched = op
break
if isinstance(matched, dict) and "sl_breakeven_secured" in matched:
p["sl_breakeven_secured"] = bool(matched["sl_breakeven_secured"])
def _agent_position_has_mark(p: dict) -> bool:
try:
v = float(p.get("mark_price"))
return v > 0
except (TypeError, ValueError):
return False
def _apply_agent_mark_price(p: dict, mark_price: object, mark_display: object = None) -> None:
try:
mpf = float(mark_price)
except (TypeError, ValueError):
return
if mpf <= 0:
return
p["mark_price"] = mpf
disp = mark_display
if disp is not None and str(disp).strip() not in ("", "-"):
p["mark_price_fmt"] = str(disp)
def _find_matched_order_price_op(
p: dict,
order_prices: list,
hub_orders: list,
op_by_id: dict,
) -> dict | None:
sym = p.get("symbol") or ""
side = (p.get("side") or "").lower()
for o in hub_orders:
if not isinstance(o, dict):
continue
o_sym = o.get("exchange_symbol") or o.get("symbol") or ""
if not _symbols_match(sym, o_sym):
continue
if (o.get("direction") or "").lower() != side:
continue
matched = op_by_id.get(o.get("id"))
if isinstance(matched, dict):
return matched
break
for op in order_prices:
if not isinstance(op, dict):
continue
if not _symbols_match(sym, op.get("symbol") or ""):
continue
return op
return None
def _merge_flask_position_mark_price(
agent_row: dict, snap: dict | None, hub_mon: dict | None
) -> None:
"""子代理无标记价时,用实例 price_snapshot 的交易所标记价补全中控持仓展示。"""
ag = agent_row.get("agent")
if not isinstance(ag, dict) or not isinstance(snap, dict):
return
positions = ag.get("positions")
if not isinstance(positions, list) or not positions:
return
order_prices = snap.get("order_prices") or []
hub_orders = []
if isinstance(hub_mon, dict):
hub_orders = hub_mon.get("orders") or []
op_by_id = {
op.get("id"): op
for op in order_prices
if isinstance(op, dict) and op.get("id") is not None
}
for p in positions:
if not isinstance(p, dict) or _agent_position_has_mark(p):
continue
matched = _find_matched_order_price_op(p, order_prices, hub_orders, op_by_id)
if isinstance(matched, dict):
_apply_agent_mark_price(
p,
matched.get("exchange_mark_price"),
matched.get("exchange_mark_price_display"),
)
position_marks = snap.get("position_marks") or []
if not isinstance(position_marks, list):
return
for p in positions:
if not isinstance(p, dict) or _agent_position_has_mark(p):
continue
sym = p.get("symbol") or ""
side = (p.get("side") or "").lower()
for pm in position_marks:
if not isinstance(pm, dict):
continue
if not _symbols_match(sym, pm.get("symbol") or ""):
continue
if (pm.get("side") or "").lower() != side:
continue
_apply_agent_mark_price(
p, pm.get("mark_price"), pm.get("mark_price_display")
)
break
def _merge_flask_exchange_tpsl(agent_row: dict, snap: dict | None, hub_mon: dict | None) -> None:
"""子代理挂单为空时,用实例 Flask 已算好的 exchange_tpsl 补全展示。"""
ag = agent_row.get("agent")
if not isinstance(ag, dict):
return
positions = ag.get("positions")
if not isinstance(positions, list) or not positions:
return
if not isinstance(snap, dict):
return
order_prices = snap.get("order_prices") or []
hub_orders = []
if isinstance(hub_mon, dict):
hub_orders = hub_mon.get("orders") or []
for p in positions:
if not isinstance(p, dict):
continue
sym = p.get("symbol") or ""
side = p.get("side") or ""
et = _find_exchange_tpsl_for_position(sym, side, order_prices, hub_orders)
if not et:
et = _exchange_tpsl_from_hub_order(hub_orders, sym, side)
if not et:
continue
p["exchange_tpsl"] = et
cond = p.get("conditional_orders") or []
merged = _tpsl_slots_to_conditional_orders(et, sym)
p["conditional_orders"] = _merge_conditional_orders_no_dup(cond, merged)
async def _fetch_exchange_flask_bundle(
client: httpx.AsyncClient, ex: dict
) -> tuple[dict | None, dict | None, list | None, dict | None, dict | None]:
"""单所 Flaskmonitor / meta / price_snapshot / account(有 flask_url 时)并行拉取。"""
caps = ex.get("capabilities") or []
tasks = [
_fetch_flask_json(client, ex, "/api/hub/monitor"),
_fetch_flask_json(client, ex, "/api/hub/meta"),
]
has_flask = bool((ex.get("flask_url") or "").strip())
if has_flask:
tasks.extend(
[
_fetch_flask_json(client, ex, "/api/price_snapshot"),
_fetch_flask_json(client, ex, "/api/hub/account"),
]
)
results = await asyncio.gather(*tasks)
hub_mon = results[0]
meta = results[1]
snap = results[2] if has_flask and len(results) > 2 else None
account = results[3] if has_flask and len(results) > 3 else None
key_prices = None
want_prices = HUB_BOARD_KEY_PRICES and "key" in caps
if want_prices and isinstance(snap, dict):
key_prices = snap.get("key_prices")
return (
hub_mon,
meta,
key_prices,
snap if isinstance(snap, dict) else None,
account if isinstance(account, dict) else None,
)
async def _assemble_board_row(
client: httpx.AsyncClient, ex: dict, agent_row: dict
) -> dict:
hub_mon, meta, key_prices, snap, account = await _fetch_exchange_flask_bundle(
client, ex
)
if isinstance(hub_mon, dict):
_merge_flask_order_price_fields(hub_mon, snap)
_merge_flask_exchange_tpsl(agent_row, snap, hub_mon if isinstance(hub_mon, dict) else None)
_merge_flask_position_breakeven(agent_row, snap, hub_mon if isinstance(hub_mon, dict) else None)
_merge_flask_position_mark_price(agent_row, snap, hub_mon if isinstance(hub_mon, dict) else None)
flask_ok = isinstance(hub_mon, dict) and hub_mon.get("ok") is not False
acct_ok = isinstance(account, dict) and account.get("ok") is not False
raw_review = (ex.get("review_url") or "").strip()
review_link = browser_url(raw_review) if raw_review else default_review_url(
ex.get("flask_url")
)
return {
**agent_row,
"flask_url": ex.get("flask_url") or "",
"flask_url_browser": browser_url(ex.get("flask_url")),
"review_url": review_link,
"hub_monitor": hub_mon,
"flask_ok": flask_ok,
"flask_error": _flask_error_from_hub_mon(hub_mon if isinstance(hub_mon, dict) else None),
"meta": (meta or {}).get("meta") if isinstance(meta, dict) else meta,
"key_prices": key_prices,
"funding_usdt": account.get("funding_usdt") if acct_ok else None,
"trading_usdt": account.get("trading_usdt") if acct_ok else None,
"available_trading_usdt": account.get("available_trading_usdt") if acct_ok else None,
"account_ok": acct_ok,
}
async def _build_monitor_board_payload() -> dict:
exchanges = enabled_exchanges()
async with httpx.AsyncClient() as client:
agent_rows = await asyncio.gather(
*[_fetch_agent_status(client, ex) for ex in exchanges]
)
out = await asyncio.gather(
*[
_assemble_board_row(client, ex, agent_row)
for ex, agent_row in zip(exchanges, agent_rows)
]
)
return {
"rows": list(out),
"updated_at": __import__("datetime").datetime.now().isoformat(timespec="seconds"),
}
@app.get("/api/monitor/board")
@app.get("/api/monitor/board/snapshot")
async def api_monitor_board_snapshot():
"""读后台缓存快照;完整聚合由 hub 每 HUB_BOARD_POLL_INTERVAL 秒执行。"""
return board_store.snapshot_dict()
@app.get("/api/monitor/board/stream")
async def api_monitor_board_stream():
from fastapi.responses import StreamingResponse
return StreamingResponse(
board_store.iter_sse(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
@app.post("/api/monitor/board/refresh")
async def api_monitor_board_refresh():
_schedule_board_refresh()
return {"ok": True, "board_version": board_store.version}
@app.get("/api/host/status")
async def api_host_status():
from hub_host_status_lib import get_host_status
return await asyncio.to_thread(get_host_status)
def _require_hub_logged_in(request: Request) -> None:
if password_required() and not validate_session_token(request.cookies.get(SESSION_COOKIE)):
raise HTTPException(status_code=401, detail="未登录中控")
@app.get("/api/instance/open-url")
def api_instance_open_url(
request: Request,
exchange_id: str,
next: str = "/",
embed: str = "",
hub_theme: str = "",
):
"""已登录中控时生成实例 SSO 打开链接(2h 有效、单次使用,复用 HUB_BRIDGE_TOKEN)。"""
_require_hub_logged_in(request)
if not HUB_BRIDGE_TOKEN:
raise HTTPException(status_code=503, detail="未配置 HUB_BRIDGE_TOKEN,无法签发实例打开链接")
ex = _find_exchange(exchange_id)
if not ex:
raise HTTPException(status_code=404, detail="未知交易所 id")
base = browser_url((ex.get("flask_url") or "").strip()).rstrip("/")
if not base:
raise HTTPException(status_code=400, detail="该账户未配置 flask_url")
ex_key = (ex.get("key") or "").strip().lower()
if not ex_key:
raise HTTPException(status_code=400, detail="该账户缺少 key(用于 SSO 校验)")
nxt = safe_next_path(next)
token = mint_hub_sso_token(ex_key, nxt)
if not token:
raise HTTPException(status_code=503, detail="签发 SSO 失败")
params = {"token": token, "next": nxt}
if (embed or "").strip().lower() in ("1", "true", "yes", "on"):
params["embed"] = "1"
ht = (hub_theme or "").strip().lower()
if ht in ("light", "dark"):
params["hub_theme"] = ht
q = urlencode(params)
return {
"ok": True,
"url": f"{base}/hub-sso?{q}",
"expires_in": HUB_SSO_TTL_SEC,
"exchange_id": exchange_id,
"exchange_key": ex_key,
}
class CloseAllBody(BaseModel):
exclude_ids: list[str] = Field(default_factory=list)
class ClosePositionBody(BaseModel):
symbol: str
side: str
class CancelOrderBody(BaseModel):
symbol: str
order_id: str
channel: str = "regular"
class CancelSymbolOrdersBody(BaseModel):
symbol: str
scope: str = "all"
class PlaceTpslBody(BaseModel):
symbol: str
side: str
stop_loss: float
take_profit: float
contracts: float | None = None
class TrendPlanActionBody(BaseModel):
plan_id: int
breakeven_offset_pct: float | None = None
def _flask_hub_messages(parsed: dict | None) -> tuple[bool, str]:
if not isinstance(parsed, dict):
return False, "实例返回无效"
msgs = list(parsed.get("messages") or [])
if parsed.get("msg"):
msgs.insert(0, str(parsed["msg"]))
if parsed.get("error"):
msgs.append(str(parsed["error"]))
ok = parsed.get("ok") is not False
if parsed.get("ok") is True:
ok = True
elif parsed.get("ok") is False:
ok = False
else:
for m in msgs:
if any(
k in str(m)
for k in ("失败", "错误", "无法", "缺少", "过期", "未找到", "不允许", "异常")
):
ok = False
break
text = "".join(str(x) for x in msgs if x) or ("成功" if ok else "操作失败")
return ok, text
@app.post("/api/trend/{exchange_id}/stop")
async def api_trend_plan_stop(exchange_id: str, body: TrendPlanActionBody):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
if "trend" not in (ex.get("capabilities") or []):
raise HTTPException(status_code=400, detail="该账户未启用趋势计划监控")
pid = int(body.plan_id)
async with httpx.AsyncClient() as client:
parsed = await _fetch_flask_json(
client, ex, f"/api/hub/trend/stop/{pid}", method="POST"
)
ok, text = _flask_hub_messages(parsed)
_schedule_board_refresh()
return {"ok": ok, "message": text, "payload": parsed}
@app.post("/api/trend/{exchange_id}/breakeven")
async def api_trend_plan_breakeven(exchange_id: str, body: TrendPlanActionBody):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
if "trend" not in (ex.get("capabilities") or []):
raise HTTPException(status_code=400, detail="该账户未启用趋势计划监控")
pid = int(body.plan_id)
data = {}
if body.breakeven_offset_pct is not None:
data["breakeven_offset_pct"] = str(body.breakeven_offset_pct)
async with httpx.AsyncClient() as client:
parsed = await _fetch_flask_json(
client,
ex,
f"/api/hub/trend/breakeven/{pid}",
method="POST",
data=data,
)
ok, text = _flask_hub_messages(parsed)
_schedule_board_refresh()
return {"ok": ok, "message": text, "payload": parsed}
@app.post("/api/orders/{exchange_id}/cancel")
async def api_cancel_order(exchange_id: str, body: CancelOrderBody):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
url = f"{ex['agent_url'].rstrip('/')}/orders/cancel"
async with httpx.AsyncClient() as client:
r = await client.post(
url,
headers=_agent_headers(),
json={
"symbol": body.symbol,
"order_id": body.order_id,
"channel": body.channel or "regular",
},
timeout=60.0,
)
try:
payload = r.json()
except Exception:
payload = {"raw": (r.text or "")[:2000]}
out = {
"exchange": ex,
"status_code": r.status_code,
"payload": payload,
"ok": bool(isinstance(payload, dict) and payload.get("ok")),
}
_schedule_board_refresh()
return out
@app.post("/api/orders/{exchange_id}/cancel-symbol")
async def api_cancel_symbol_orders(exchange_id: str, body: CancelSymbolOrdersBody):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
url = f"{ex['agent_url'].rstrip('/')}/orders/cancel-symbol"
async with httpx.AsyncClient() as client:
r = await client.post(
url,
headers=_agent_headers(),
json={"symbol": body.symbol, "scope": body.scope or "all"},
timeout=120.0,
)
try:
payload = r.json()
except Exception:
payload = {"raw": (r.text or "")[:2000]}
out = {
"exchange": ex,
"status_code": r.status_code,
"payload": payload,
"ok": bool(isinstance(payload, dict) and payload.get("ok")),
}
_schedule_board_refresh()
return out
@app.post("/api/close/{exchange_id}/position")
async def api_close_position(exchange_id: str, body: ClosePositionBody):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
sym = (body.symbol or "").strip()
side = (body.side or "").strip().lower()
if not sym:
raise HTTPException(status_code=400, detail="symbol 不能为空")
if side not in ("long", "short"):
raise HTTPException(status_code=400, detail="side 须为 long 或 short")
url = f"{ex['agent_url'].rstrip('/')}/emergency/close-position"
async with httpx.AsyncClient() as client:
r = await client.post(
url,
headers=_agent_headers(),
json={"symbol": sym, "side": side},
timeout=120.0,
)
try:
payload = r.json()
except Exception:
payload = {"raw": (r.text or "")[:2000]}
out = {
"exchange": ex,
"status_code": r.status_code,
"payload": payload,
"ok": bool(isinstance(payload, dict) and payload.get("ok")),
}
if out.get("ok"):
ex_key = (ex.get("key") or "").strip().lower()
async with httpx.AsyncClient() as flask_client:
if ex_key in ("gate", "gate_bot"):
order_sync = await _fetch_flask_json(
flask_client,
ex,
"/api/hub/order/sync-flat",
method="POST",
json_body={"symbol": sym, "side": side},
)
if isinstance(order_sync, dict):
out["order_sync"] = order_sync
if "trend" in (ex.get("capabilities") or []):
sync_parsed = await _fetch_flask_json(
flask_client,
ex,
"/api/hub/trend/sync-flat",
method="POST",
json_body={"symbol": sym, "side": side},
)
if isinstance(sync_parsed, dict):
out["trend_sync"] = sync_parsed
risk_sync = await _notify_instance_user_close(flask_client, ex, count=1)
if isinstance(risk_sync, dict):
out["risk_sync"] = risk_sync
_schedule_board_refresh()
return out
@app.post("/api/orders/{exchange_id}/place-tpsl")
async def api_place_tpsl(exchange_id: str, body: PlaceTpslBody):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
url = f"{ex['agent_url'].rstrip('/')}/orders/place-tpsl"
async with httpx.AsyncClient() as client:
r = await client.post(
url,
headers=_agent_headers(),
json={
"symbol": body.symbol,
"side": body.side,
"stop_loss": body.stop_loss,
"take_profit": body.take_profit,
"contracts": body.contracts,
},
timeout=120.0,
)
try:
payload = r.json()
except Exception:
payload = {"raw": (r.text or "")[:2000]}
out = {
"exchange": ex,
"status_code": r.status_code,
"payload": payload,
"ok": bool(isinstance(payload, dict) and payload.get("ok")),
}
_schedule_board_refresh()
return out
@app.post("/api/close/{exchange_id}")
async def api_close_exchange(exchange_id: str):
ex = _find_exchange(exchange_id)
if not ex or not ex.get("enabled"):
raise HTTPException(status_code=404, detail="账户未启用")
url = f"{ex['agent_url'].rstrip('/')}/emergency/close-all"
async with httpx.AsyncClient() as client:
r = await client.post(url, headers=_agent_headers(), timeout=120.0)
try:
body = r.json()
except Exception:
body = {"raw": (r.text or "")[:2000]}
ok = bool(isinstance(body, dict) and body.get("ok"))
out = {"exchange": ex, "status_code": r.status_code, "payload": body, "ok": ok}
if ok and isinstance(body, dict):
closed = body.get("closed") or []
n = len(closed) if isinstance(closed, list) else 0
if n > 0:
risk_sync = await _notify_instance_user_close(client, ex, count=n)
if isinstance(risk_sync, dict):
out["risk_sync"] = risk_sync
_schedule_board_refresh()
return out
@app.post("/api/close-all")
async def api_close_all(body: CloseAllBody | None = Body(default=None)):
excl = set(body.exclude_ids if body else [])
excl |= env_force_disabled_ids()
targets = [x for x in enabled_exchanges() if str(x["id"]) not in excl]
async with httpx.AsyncClient() as client:
async def one(ex: dict):
url = f"{ex['agent_url'].rstrip('/')}/emergency/close-all"
try:
r = await client.post(url, headers=_agent_headers(), timeout=120.0)
try:
payload = r.json()
except Exception:
payload = {"raw": (r.text or "")[:2000]}
row = {"id": ex["id"], "name": ex["name"], "status_code": r.status_code, "payload": payload}
if isinstance(payload, dict) and payload.get("ok"):
closed = payload.get("closed") or []
n = len(closed) if isinstance(closed, list) else 0
if n > 0:
risk_sync = await _notify_instance_user_close(client, ex, count=n)
if isinstance(risk_sync, dict):
row["risk_sync"] = risk_sync
return row
except Exception as e:
return {"id": ex["id"], "name": ex["name"], "status_code": None, "error": str(e)}
results = await asyncio.gather(*[one(ex) for ex in targets])
_schedule_board_refresh()
return {"results": list(results)}
def _trade_removed_response():
"""旧版前端或缓存页面仍会请求 /api/trade/*,勿解析表单,直接返回说明。"""
return JSONResponse(
{
"ok": False,
"result": {
"ok": False,
"messages": [
"中控已移除下单区。请在监控卡片点击「实例」,"
"进入对应 crypto_monitor_* 网页添加关键位或下单。"
],
},
"deprecated": True,
},
status_code=410,
)
def _parse_anchor_ms(at: str = "", anchor_ms: str = "") -> int | None:
raw = (anchor_ms or at or "").strip()
if not raw:
return None
return parse_wall_clock_ms(raw)
@app.get("/api/archive/meta")
def api_archive_meta():
init_archive_db()
exchanges = []
for ex in enabled_exchanges(load_settings()):
exchanges.append(
{
"id": ex.get("id"),
"key": ex.get("key"),
"name": ex.get("name"),
}
)
return {
"ok": True,
"timeframes": sorted(ARCHIVE_TIMEFRAMES),
"default_timeframe": ARCHIVE_DEFAULT_TIMEFRAME,
"seed_lookback_days": ARCHIVE_SEED_LOOKBACK_DAYS,
"sync_interval_sec": ARCHIVE_SYNC_INTERVAL_SEC,
"visible_bars_default": ARCHIVE_VISIBLE_BARS_DEFAULT,
"exchanges": exchanges,
"last_sync": _last_archive_sync,
}
@app.get("/api/archive/list")
def api_archive_list(
exchange_key: str = "",
filter_profit: str = "",
filter_loss: str = "",
filter_sick: str = "",
filter_emotion: str = "",
):
init_archive_db()
rows = list_symbol_rows(
exchange_key=exchange_key,
filter_profit=(filter_profit or "").lower() in ("1", "true", "yes", "on"),
filter_loss=(filter_loss or "").lower() in ("1", "true", "yes", "on"),
filter_sick=(filter_sick or "").lower() in ("1", "true", "yes", "on"),
filter_emotion=(filter_emotion or "").lower() in ("1", "true", "yes", "on"),
)
return {"ok": True, "rows": rows, "count": len(rows)}
@app.get("/api/archive/daily-trades")
def api_archive_daily_trades(
period: str = "",
trading_day: str = "",
date_from: str = "",
date_to: str = "",
exchange_key: str = "",
filter_profit: str = "",
filter_loss: str = "",
filter_sick: str = "",
search: str = "",
):
init_archive_db()
payload = list_daily_trades(
trading_day=trading_day,
period=period or "today",
date_from=date_from,
date_to=date_to,
exchange_key=exchange_key,
filter_profit=(filter_profit or "").lower() in ("1", "true", "yes", "on"),
filter_loss=(filter_loss or "").lower() in ("1", "true", "yes", "on"),
filter_sick=(filter_sick or "").lower() in ("1", "true", "yes", "on"),
search=search,
)
return {"ok": True, **payload}
@app.get("/api/archive/quotes")
def api_archive_quotes():
init_archive_db()
rows = list_review_quotes()
return {"ok": True, "quotes": rows, "count": len(rows), "max": ARCHIVE_QUOTES_MAX}
class ArchiveQuoteBody(BaseModel):
quote_date: str = ""
content: str = ""
@app.post("/api/archive/quotes")
def api_archive_quote_create(body: ArchiveQuoteBody = Body(...)):
init_archive_db()
try:
row = create_review_quote(body.quote_date, body.content)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
return {"ok": True, "quote": row}
@app.patch("/api/archive/quotes/{quote_id}")
def api_archive_quote_update(quote_id: int, body: ArchiveQuoteBody = Body(...)):
init_archive_db()
try:
row = update_review_quote(
int(quote_id),
quote_date=body.quote_date or None,
content=body.content if body.content is not None else None,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
if not row:
raise HTTPException(status_code=404, detail="语录不存在")
return {"ok": True, "quote": row}
@app.delete("/api/archive/quotes/{quote_id}")
def api_archive_quote_delete(quote_id: int):
init_archive_db()
if not delete_review_quote(int(quote_id)):
raise HTTPException(status_code=404, detail="语录不存在")
return {"ok": True, "id": int(quote_id)}
class MacroEventBody(BaseModel):
event_type: str = ""
event_at: str = ""
note: str = ""
@app.get("/api/macro-calendar/meta")
def api_macro_calendar_meta():
init_macro_calendar_db()
return {
"ok": True,
"event_types": [
{"id": k, "label": MACRO_EVENT_LABELS[k]} for k in MACRO_EVENT_TYPES
],
"window_before_minutes": 60,
"window_after_minutes": 60,
"timezone": "Asia/Shanghai",
}
@app.get("/api/macro-calendar/events")
def api_macro_calendar_events():
init_macro_calendar_db()
rows = list_macro_events()
return {"ok": True, "events": rows, "count": len(rows)}
@app.get("/api/macro-calendar/active")
def api_macro_calendar_active():
init_macro_calendar_db()
alerts = list_active_alerts()
return {"ok": True, "alerts": alerts, "count": len(alerts)}
@app.post("/api/macro-calendar/events")
def api_macro_calendar_create(body: MacroEventBody = Body(...)):
init_macro_calendar_db()
try:
row = create_macro_event(body.event_type, body.event_at, note=body.note)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
return {"ok": True, "event": row}
@app.patch("/api/macro-calendar/events/{event_id}")
def api_macro_calendar_update(event_id: int, body: MacroEventBody = Body(...)):
init_macro_calendar_db()
try:
row = update_macro_event(
int(event_id),
event_type=body.event_type or None,
event_at=body.event_at or None,
note=body.note,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
if not row:
raise HTTPException(status_code=404, detail="记录不存在")
return {"ok": True, "event": row}
@app.delete("/api/macro-calendar/events/{event_id}")
def api_macro_calendar_delete(event_id: int):
init_macro_calendar_db()
if not delete_macro_event(int(event_id)):
raise HTTPException(status_code=404, detail="记录不存在")
return {"ok": True, "id": int(event_id)}
@app.get("/api/archive/detail")
def api_archive_detail(exchange_key: str = "", symbol: str = ""):
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
if not ex_k or not sym:
raise HTTPException(status_code=400, detail="缺少 exchange_key 或 symbol")
init_archive_db()
trades = load_symbol_trades(ex_k, sym)
return {"ok": True, "exchange_key": ex_k, "symbol": sym, "trades": trades}
@app.get("/api/archive/ohlcv")
def api_archive_ohlcv(
exchange_key: str = "",
symbol: str = "",
timeframe: str = ARCHIVE_DEFAULT_TIMEFRAME,
mode: str = "hold",
anchor_ms: str = "",
opened_ms: str = "",
closed_ms: str = "",
range: str = "",
at: str = "",
bars: str = "",
):
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
if not ex_k or not sym:
raise HTTPException(status_code=400, detail="缺少 exchange_key 或 symbol")
init_archive_db()
anchor = _parse_anchor_ms(at, anchor_ms)
open_ms = _parse_anchor_ms("", opened_ms)
close_ms = _parse_anchor_ms("", closed_ms)
try:
bar_n = int(bars) if (bars or "").strip().isdigit() else ARCHIVE_VISIBLE_BARS_DEFAULT
except ValueError:
bar_n = ARCHIVE_VISIBLE_BARS_DEFAULT
result = resolve_archive_chart(
ex_k,
sym,
timeframe,
anchor_ms=anchor,
opened_ms=open_ms,
closed_ms=close_ms,
mode=mode,
bars=bar_n,
range_mode=(range or "").strip().lower() or "window",
)
if not result.get("ok"):
raise HTTPException(status_code=404, detail=result.get("msg") or "无 K 线")
return result
class ArchiveOverlayBody(BaseModel):
behavior_tag: str = ""
note: str = ""
@app.patch("/api/archive/trade/{exchange_key}/{trade_id}")
def api_archive_trade_overlay(
exchange_key: str,
trade_id: int,
body: ArchiveOverlayBody = Body(...),
):
ex_k = (exchange_key or "").strip().lower()
if not ex_k:
raise HTTPException(status_code=400, detail="缺少 exchange_key")
init_archive_db()
out = upsert_trade_overlay(
ex_k,
int(trade_id),
behavior_tag=body.behavior_tag,
note=body.note,
)
return {"ok": True, "overlay": out}
@app.delete("/api/archive/trade/{exchange_key}/{trade_id}")
def api_archive_trade_delete(exchange_key: str, trade_id: int):
from hub_symbol_archive_lib import delete_trade_from_archive
ex_k = (exchange_key or "").strip().lower()
if not ex_k:
raise HTTPException(status_code=400, detail="缺少 exchange_key")
init_archive_db()
removed = delete_trade_from_archive(ex_k, int(trade_id))
if not removed:
raise HTTPException(status_code=404, detail="档案中无该笔交易")
return {"ok": True, "exchange_key": ex_k, "trade_id": int(trade_id)}
@app.post("/api/archive/sync")
async def api_archive_sync():
body = await _run_archive_sync_once()
return body
@app.get("/api/entry-plans/meta")
def api_entry_plans_meta():
init_entry_plan_db()
exchanges = []
for ex in enabled_exchanges(load_settings()):
exchanges.append(
{
"id": ex.get("id"),
"key": ex.get("key"),
"name": ex.get("name"),
}
)
return {"ok": True, **entry_plan_meta_payload(exchanges)}
@app.get("/api/entry-plans")
def api_entry_plans_list(status: str = "active"):
init_entry_plan_db()
try:
rows = list_entry_plans(status=status)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
return {"ok": True, "plans": rows, "count": len(rows), "status": status.strip().lower()}
@app.get("/api/entry-plans/stats")
def api_entry_plan_stats(
dimension: str = "symbol",
period: str = "all",
date_from: str = "",
date_to: str = "",
):
init_entry_plan_db()
try:
stats = compute_entry_plan_stats(
dimension=dimension,
period=period,
date_from=date_from,
date_to=date_to,
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
return {"ok": True, "stats": stats}
@app.get("/api/entry-plans/{plan_id}")
def api_entry_plan_detail(plan_id: int):
init_entry_plan_db()
row = get_entry_plan(int(plan_id))
if not row:
raise HTTPException(status_code=404, detail="计划不存在")
return {"ok": True, "plan": row}
class EntryPlanBody(BaseModel):
plan_date: str = ""
exchange_key: str = ""
symbol: str = ""
plan_type: str = ""
trend_timeframe: str = ""
entry_timeframe: str = ""
direction: str = ""
target_level: str = ""
current_range: str = ""
entry_scheme: str = ""
result: str | None = None
pnl_amount: float | None = None
note: str = ""
@app.post("/api/entry-plans")
def api_entry_plan_create(body: EntryPlanBody = Body(...)):
init_entry_plan_db()
try:
row = create_entry_plan(body.model_dump(exclude_unset=True))
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
return {"ok": True, "plan": row}
@app.patch("/api/entry-plans/{plan_id}")
def api_entry_plan_update(plan_id: int, body: EntryPlanBody = Body(...)):
init_entry_plan_db()
payload = body.model_dump(exclude_unset=True)
if not payload:
raise HTTPException(status_code=400, detail="无更新字段")
try:
row = update_entry_plan(int(plan_id), payload)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
if not row:
raise HTTPException(status_code=404, detail="计划不存在")
return {"ok": True, "plan": row}
@app.delete("/api/entry-plans/{plan_id}")
def api_entry_plan_delete(plan_id: int):
init_entry_plan_db()
try:
ok = delete_entry_plan(int(plan_id))
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
if not ok:
raise HTTPException(status_code=404, detail="计划不存在或已归档")
return {"ok": True, "id": int(plan_id)}
@app.get("/api/hub/fund-overview")
def api_hub_fund_overview():
from hub_fund_history_lib import build_fund_overview
from hub_ai.config import trading_day_reset_hour
settings = load_settings()
snap = board_store.snapshot_dict()
payload = build_fund_overview(
settings.get("exchanges") or [],
board_rows=snap.get("rows") or [],
reset_hour=trading_day_reset_hour(),
updated_at=snap.get("updated_at"),
)
return payload
@app.get("/api/ping")
def api_ping():
return {
"ok": True,
"service": "manual-trading-hub",
"build": HUB_BUILD,
"trade_ui": False,
"features": ["monitor", "settings", "auth", "board_sse", "dashboard_sse", "archive", "dashboard", "funds", "macro_calendar"],
"board_poll_interval_sec": HUB_BOARD_POLL_INTERVAL,
"board_version": board_store.version,
"board_aggregating": board_store.aggregating,
"board_updated_at": (board_store.payload or {}).get("updated_at")
if isinstance(board_store.payload, dict)
else None,
"board_error": board_store.last_error,
"dashboard_poll_interval_sec": DASHBOARD_POLL_INTERVAL_SEC,
"dashboard_version": dashboard_store.version,
"dashboard_aggregating": dashboard_store.aggregating,
"dashboard_updated_at": (dashboard_store.payload or {}).get("updated_at")
if isinstance(dashboard_store.payload, dict)
else None,
"dashboard_error": dashboard_store.last_error,
"password_required": password_required(),
"env_disabled_ids": sorted(env_force_disabled_ids()),
"hub_disabled_ids_raw": (os.getenv("HUB_DISABLED_IDS") or ""),
}
@app.post("/api/trade/order/{exchange_id}")
@app.post("/api/trade/key/{exchange_id}")
@app.post("/api/trade/trend/preview/{exchange_id}")
@app.post("/api/trade/trend/execute/{exchange_id}")
async def api_trade_removed(exchange_id: str):
return _trade_removed_response()
@app.get("/api/trade/meta/{exchange_id}")
@app.get("/api/trade/trend/preview/{exchange_id}/{preview_id}")
async def api_trade_removed_get(exchange_id: str, preview_id: str = ""):
return _trade_removed_response()
def main():
import uvicorn
print(
f"manual-trading-hub start build={HUB_BUILD} listen={HUB_HOST}:{HUB_PORT}",
flush=True,
)
uvicorn.run(app, host=HUB_HOST, port=HUB_PORT, log_level="info", access_log=False)
if __name__ == "__main__":
main()