""" 多账户交易中控:监控区 / 系统设置。 聚合各实例监控数据与子代理 /status;下单请在各实例网页操作。 """ from __future__ import annotations import asyncio import os import sys from contextlib import asynccontextmanager from pathlib import Path _REPO_ROOT = Path(__file__).resolve().parent.parent if str(_REPO_ROOT) not in sys.path: sys.path.insert(0, str(_REPO_ROOT)) from hub_kline_store import format_ohlcv_detail, resolve_chart_bars, retention_days from hub_ohlcv_lib import ( CHART_TIMEFRAME_ORDER, CHART_TIMEFRAMES, bar_limit_for_timeframe, chart_chunk_limit, chart_initial_limit, chart_memory_cap, retention_policy_meta, ) from hub_volume_rank_lib import ( TOP_N_DEFAULT, _exchange_rank_row_stale, cache_needs_refresh, format_volume_quote, get_cached_rank, load_volume_rank_cache, merge_exchange_rank, rank_date_label, save_volume_rank_cache, seconds_until_next_reset, volume_rank_reset_hour, ) from hub_symbol_archive_lib import ( ARCHIVE_DEFAULT_TIMEFRAME, ARCHIVE_SEED_LOOKBACK_DAYS, ARCHIVE_SYNC_INTERVAL_SEC, ARCHIVE_TIMEFRAMES, ARCHIVE_TRADE_DAYS, ARCHIVE_TRADE_LIMIT, ARCHIVE_VISIBLE_BARS_DEFAULT, init_db as init_archive_db, list_symbol_rows, load_symbol_trades, parse_wall_clock_ms, resolve_archive_chart, sync_exchange_symbol_archives, upsert_trade_overlay, ) from env_load import load_hub_dotenv load_hub_dotenv() import httpx from fastapi import Body, FastAPI, HTTPException, Request from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, Field from settings_store import ( enabled_exchanges, env_force_disabled_ids, load_settings, save_settings, ) from hub_web_auth import ( SESSION_COOKIE, SESSION_MAX_AGE_SEC, clear_session_cookie, cookie_secure_for_request, create_session_token, embed_allowed, embed_frame_ancestors, is_public_path, password_required, set_session_cookie, validate_session_token, expected_username, verify_credentials, ) from hub_sso import HUB_SSO_TTL_SEC, mint_hub_sso_token, safe_next_path from url_public import browser_url, default_review_url, public_origin from urllib.parse import urlencode from hub_board_cache import HUB_BOARD_POLL_INTERVAL, board_store from hub_chart_cache import ( HUB_CHART_POLL_INTERVAL, HUB_CHART_WATCH_TTL_SEC, chart_poll_store, parse_series_key, ) try: from exchange_orders import symbols_match as _symbols_match except ImportError: def _symbols_match(position_symbol: str, order_symbol: str) -> bool: a = (position_symbol or "").strip().upper() b = (order_symbol or "").strip().upper() return bool(a and b and a == b) HUB_HOST = os.getenv("HUB_HOST", "0.0.0.0") HUB_PORT = int(os.getenv("HUB_PORT", "5100")) HUB_BRIDGE_TOKEN = (os.getenv("HUB_BRIDGE_TOKEN") or os.getenv("CONTROL_TOKEN") or "").strip() _trust_raw = (os.getenv("HUB_TRUST_LAN", "true") or "").strip().lower() HUB_TRUST_LAN = _trust_raw not in ("0", "false", "no", "off") _allow_pub_raw = (os.getenv("HUB_ALLOW_PUBLIC") or "").strip().lower() # 云服务器 + 域名反代时设为 true:不做 IP 限制,仅靠 HUB_PASSWORD / 登录页保护 HUB_ALLOW_PUBLIC = _allow_pub_raw in ("1", "true", "yes", "on") DIR = Path(__file__).resolve().parent HUB_BUILD = "20260607-hub-archive" _archive_sync_stop: asyncio.Event | None = None _archive_sync_task: asyncio.Task | None = None _last_archive_sync: dict | None = None _volume_rank_stop: asyncio.Event | None = None _volume_rank_task: asyncio.Task | None = None _volume_rank_cache: dict | None = None HUB_AGENT_TIMEOUT = float(os.getenv("HUB_AGENT_TIMEOUT", "8")) HUB_FLASK_TIMEOUT = float(os.getenv("HUB_FLASK_TIMEOUT", "10")) HUB_BOARD_TIMEOUT = float(os.getenv("HUB_BOARD_TIMEOUT", "45")) _board_key_prices_raw = (os.getenv("HUB_BOARD_KEY_PRICES", "true") or "").strip().lower() HUB_BOARD_KEY_PRICES = _board_key_prices_raw in ("1", "true", "yes", "on") def _is_local(host: str | None) -> bool: if not host: return False h = host.lower() return h in ("127.0.0.1", "::1", "localhost") or h.startswith("::ffff:127.0.0.1") def _ipv4_rfc1918_private(host: str) -> bool: h = host.lower() if h.startswith("::ffff:"): h = h[7:] parts = h.split(".") if len(parts) != 4: return False try: a, b, c, d = (int(x) for x in parts) except ValueError: return False if any(x < 0 or x > 255 for x in (a, b, c, d)): return False if a == 10: return True if a == 172 and 16 <= b <= 31: return True if a == 192 and b == 168: return True return False def _client_allowed(host: str | None) -> bool: if _is_local(host): return True if HUB_TRUST_LAN and host and _ipv4_rfc1918_private(host): return True return False def _hub_headers() -> dict[str, str]: if not HUB_BRIDGE_TOKEN: return {} return {"X-Hub-Token": HUB_BRIDGE_TOKEN} def _agent_headers() -> dict[str, str]: if not HUB_BRIDGE_TOKEN: return {} return {"X-Control-Token": HUB_BRIDGE_TOKEN} def _find_exchange(ex_id: str) -> dict | None: for ex in load_settings().get("exchanges") or []: if str(ex.get("id")) == str(ex_id): return ex return None async def _run_chart_poll() -> dict: keys = chart_poll_store.active_series_keys() if not keys: return {"ok": True, "series_count": 0, "polled": 0} polled = 0 errors: list[str] = [] for key in keys: parsed = parse_series_key(key) if not parsed: continue ex_k, sym, tf = parsed ex = _find_exchange_by_key(ex_k) if not ex or not ex.get("enabled"): continue ex_ref = ex sym_ref = sym tf_ref = tf def remote_fetch(**kwargs) -> dict: tf_use = kwargs.get("timeframe") or tf_ref return _fetch_instance_ohlcv_sync( ex_ref, symbol=kwargs.get("symbol") or sym_ref, timeframe=tf_use, since_ms=kwargs.get("since_ms"), limit=int(kwargs.get("limit") or bar_limit_for_timeframe(tf_use)), ) try: result = await asyncio.to_thread( resolve_chart_bars, ex_k, sym, tf, remote_fetch, force_refresh=False, tail_refresh=True, ) polled += 1 chart_poll_store.note_series_result( ex_k, sym, tf, ok=bool(result.get("ok")), fetched=int(result.get("fetched") or 0), error=None if result.get("ok") else str(result.get("msg") or "poll_failed"), candles=result.get("candles") if result.get("ok") else None, price_tick=result.get("price_tick"), ) if not result.get("ok"): errors.append(f"{key}:{result.get('msg')}") except Exception as e: chart_poll_store.note_series_result(ex_k, sym, tf, ok=False, error=str(e)) errors.append(f"{key}:{e}") out: dict = {"ok": True, "series_count": len(keys), "polled": polled} if errors: out["errors"] = errors[:8] return out async def _run_board_aggregate() -> dict: try: body = await asyncio.wait_for(_build_monitor_board_payload(), timeout=HUB_BOARD_TIMEOUT) return {"ok": True, **body} except asyncio.TimeoutError: return { "ok": False, "rows": [], "error": "board_timeout", "msg": ( f"监控聚合超过 {int(HUB_BOARD_TIMEOUT)} 秒。" "请检查子代理/Flask,或设 HUB_BOARD_KEY_PRICES=false、缩短 HUB_FLASK_TIMEOUT" ), "updated_at": __import__("datetime").datetime.now().isoformat(timespec="seconds"), } def _schedule_board_refresh() -> None: board_store.request_refresh() async def _run_archive_sync_once() -> dict: global _last_archive_sync init_archive_db() settings = load_settings() targets = enabled_exchanges(settings) results: list[dict] = [] for ex in targets: ex_key = str(ex.get("key") or "").strip().lower() if not ex_key: continue trades_resp = await asyncio.to_thread( _fetch_instance_trades_archive_sync, ex, days=ARCHIVE_TRADE_DAYS, limit=ARCHIVE_TRADE_LIMIT, ) if not trades_resp.get("ok"): st = trades_resp.get("status") msg = ( trades_resp.get("msg") or trades_resp.get("error") or trades_resp.get("detail") or "拉取交易失败" ) if st == 404: msg = ( "HTTP 404:该 Flask 未注册 /api/hub/trades/archive。" "请在仓库根目录 git pull 后 pm2 restart crypto_gate crypto_gate_bot" ) results.append( { "exchange_key": ex_key, "name": ex.get("name"), "ok": False, "status": st, "msg": msg, } ) continue trades = trades_resp.get("trades") or [] for t in trades: if isinstance(t, dict): t.setdefault("exchange_key", ex_key) def remote_fetch(**kwargs): return _fetch_instance_ohlcv_sync( ex, symbol=kwargs.get("symbol") or "", timeframe=kwargs.get("timeframe") or "5m", since_ms=kwargs.get("since_ms"), limit=int(kwargs.get("limit") or 500), ) r = await asyncio.to_thread( sync_exchange_symbol_archives, ex_key, trades, remote_fetch, ) r["name"] = ex.get("name") r["trade_count"] = len(trades) results.append(r) out = { "ok": True, "exchanges": len(targets), "results": results, "updated_at": __import__("datetime").datetime.now().isoformat(timespec="seconds"), } _last_archive_sync = out return out def _fetch_instance_volume_rank_sync(ex: dict, *, top_n: int = TOP_N_DEFAULT) -> dict: base = (ex.get("flask_url") or "").rstrip("/") if not base: return {"ok": False, "msg": "未配置 flask_url"} params = {"top": str(int(top_n))} url = f"{base}/api/hub/volume-rank?{urlencode(params)}" try: with httpx.Client(timeout=max(HUB_FLASK_TIMEOUT, 120.0)) as client: r = client.get(url, headers=_hub_headers()) if r.status_code >= 400: parsed = _parse_http_json_body(r) parsed.setdefault("ok", False) parsed.setdefault("status", r.status_code) return parsed data = r.json() if r.content else {} return data if isinstance(data, dict) else {"ok": False, "msg": "无效 JSON"} except Exception as e: return {"ok": False, "msg": str(e)} def _get_volume_rank_cache() -> dict: global _volume_rank_cache if _volume_rank_cache is None: _volume_rank_cache = load_volume_rank_cache() return _volume_rank_cache def _refresh_volume_ranks(*, force: bool = False) -> dict: global _volume_rank_cache expected = rank_date_label() cache = _get_volume_rank_cache() targets = enabled_exchanges(load_settings()) required_keys = [ str(ex.get("key") or "").strip().lower() for ex in targets if ex.get("enabled") and str(ex.get("key") or "").strip() ] if not force and not cache_needs_refresh( cache, expected_rank_date=expected, required_keys=required_keys ): return { "ok": True, "skipped": True, "rank_date": cache.get("rank_date"), "updated_at": cache.get("updated_at"), } errors: list[str] = [] for ex in targets: ex_key = str(ex.get("key") or "").strip().lower() if not ex_key or not ex.get("enabled"): continue resp = _fetch_instance_volume_rank_sync(ex, top_n=TOP_N_DEFAULT) if resp.get("ok") and resp.get("items"): cache = merge_exchange_rank(cache, ex_key, resp) else: msg = str(resp.get("msg") or resp.get("error") or "拉取失败") if resp.get("ok") and not resp.get("items"): msg = msg if msg != "拉取失败" else "无有效成交额数据" errors.append(f"{ex_key}:{msg}") exchanges = dict(cache.get("exchanges") or {}) prev = dict(exchanges.get(ex_key) or {}) prev["error"] = msg if not prev.get("items"): prev["items"] = [] exchanges[ex_key] = prev cache["exchanges"] = exchanges cache["rank_date"] = expected save_volume_rank_cache(cache) _volume_rank_cache = cache out: dict = { "ok": True, "rank_date": expected, "exchanges": len(targets), "updated_at": cache.get("updated_at"), } if errors: out["errors"] = errors[:8] return out async def _volume_rank_loop() -> None: global _volume_rank_stop stop = _volume_rank_stop if stop is None: return try: await asyncio.to_thread(_refresh_volume_ranks, force=False) except Exception: pass while not stop.is_set(): try: wait_sec = seconds_until_next_reset() await asyncio.wait_for(stop.wait(), timeout=wait_sec) break except asyncio.TimeoutError: pass if stop.is_set(): break try: await asyncio.to_thread(_refresh_volume_ranks, force=True) except Exception: pass async def _archive_sync_loop() -> None: global _archive_sync_stop stop = _archive_sync_stop if stop is None: return init_archive_db() while not stop.is_set(): try: await _run_archive_sync_once() except Exception: pass try: await asyncio.wait_for(stop.wait(), timeout=float(ARCHIVE_SYNC_INTERVAL_SEC)) except asyncio.TimeoutError: pass @asynccontextmanager async def _hub_lifespan(_app: FastAPI): global _archive_sync_stop, _archive_sync_task, _volume_rank_stop, _volume_rank_task await board_store.start(_run_board_aggregate) await chart_poll_store.start(_run_chart_poll) _archive_sync_stop = asyncio.Event() _archive_sync_task = asyncio.create_task(_archive_sync_loop(), name="hub-archive-sync") _volume_rank_stop = asyncio.Event() _volume_rank_task = asyncio.create_task(_volume_rank_loop(), name="hub-volume-rank") try: yield finally: if _archive_sync_stop: _archive_sync_stop.set() if _archive_sync_task: _archive_sync_task.cancel() try: await _archive_sync_task except asyncio.CancelledError: pass _archive_sync_task = None _archive_sync_stop = None if _volume_rank_stop: _volume_rank_stop.set() if _volume_rank_task: _volume_rank_task.cancel() try: await _volume_rank_task except asyncio.CancelledError: pass _volume_rank_task = None _volume_rank_stop = None await chart_poll_store.stop() await board_store.stop() app = FastAPI(title="复盘系统中控", docs_url=None, redoc_url=None, lifespan=_hub_lifespan) STATIC_DIR = DIR / "static" _REPO_STATIC = _REPO_ROOT / "static" _AI_REVIEW_RENDER_JS = _REPO_STATIC / "ai_review_render.js" @app.get("/assets/ai_review_render.js") def hub_ai_review_render_js(): """与四所实例共用仓库根 static/ai_review_render.js(须在 /assets mount 之前注册)。""" if not _AI_REVIEW_RENDER_JS.is_file(): raise HTTPException(status_code=404, detail="ai_review_render.js not found") return FileResponse( str(_AI_REVIEW_RENDER_JS), media_type="application/javascript; charset=utf-8", ) if STATIC_DIR.is_dir(): app.mount("/assets", StaticFiles(directory=str(STATIC_DIR)), name="assets") @app.middleware("http") async def local_only(request: Request, call_next): if HUB_ALLOW_PUBLIC: return await call_next(request) peer = request.client.host if request.client else None if not _client_allowed(peer): return JSONResponse({"detail": "forbidden"}, status_code=403) return await call_next(request) @app.middleware("http") async def embed_frame_headers(request: Request, call_next): response = await call_next(request) if embed_allowed(): ancestors = embed_frame_ancestors() if ancestors == "*": response.headers["Content-Security-Policy"] = "frame-ancestors *" else: response.headers["Content-Security-Policy"] = f"frame-ancestors 'self' {ancestors}" return response @app.middleware("http") async def hub_password_gate(request: Request, call_next): if not password_required(): return await call_next(request) path = request.url.path if is_public_path(path, request.method): return await call_next(request) token = request.cookies.get(SESSION_COOKIE) if validate_session_token(token): return await call_next(request) if path.startswith("/api/"): return JSONResponse({"detail": "未登录", "login_required": True}, status_code=401) from fastapi.responses import RedirectResponse nxt = path if path.startswith("/") else "/monitor" return RedirectResponse(f"/login?next={nxt}", status_code=302) def _shell_page(): index = STATIC_DIR / "index.html" if not index.is_file(): return JSONResponse({"detail": "missing static/index.html"}, status_code=500) return FileResponse(index) def _login_page(): login = STATIC_DIR / "login.html" if not login.is_file(): return JSONResponse({"detail": "missing static/login.html"}, status_code=500) return FileResponse(login) class LoginBody(BaseModel): username: str = "" password: str = "" @app.get("/api/auth/status") def api_auth_status(request: Request): required = password_required() logged_in = not required or validate_session_token(request.cookies.get(SESSION_COOKIE)) return { "required": required, "logged_in": logged_in, "username_hint": expected_username() if required else None, } @app.post("/api/auth/login") def api_auth_login(body: LoginBody, request: Request): if not password_required(): return {"ok": True, "auth_disabled": True} if not verify_credentials(body.username, body.password): raise HTTPException(status_code=401, detail="用户名或密码错误") token = create_session_token(body.username) embed = (request.headers.get("x-hub-embed") or "").strip() == "1" resp = JSONResponse({"ok": True, "session_token": token, "embed": embed}) set_session_cookie(resp, request, token, embed=embed) return resp @app.get("/embed-auth") def embed_auth_login(request: Request, token: str = "", next: str = "/monitor"): """ 嵌入式打开:父页跨域 fetch 登录时 Cookie 可能写不进 iframe, 用 session_token 在本页做一次导航,在 iframe 内写入 hub_sess。 """ from fastapi.responses import RedirectResponse dest = safe_next_path(next) if not password_required(): return RedirectResponse(dest, status_code=302) if not validate_session_token(token): q = urlencode({"next": dest, "embed": "1"}) return RedirectResponse(f"/login?{q}", status_code=302) resp = RedirectResponse(dest, status_code=302) set_session_cookie(resp, request, token, embed=True) return resp @app.post("/api/auth/logout") def api_auth_logout(request: Request): embed = (request.headers.get("x-hub-embed") or "").strip() == "1" resp = JSONResponse({"ok": True}) clear_session_cookie(resp, request, embed=embed) return resp @app.get("/login") def login_page(): return _login_page() @app.get("/") def root_redirect(): from fastapi.responses import RedirectResponse return RedirectResponse("/monitor") @app.get("/monitor") @app.get("/market") @app.get("/archive") @app.get("/ai") @app.get("/settings") def shell_pages(): return _shell_page() def _all_exchanges_for_ai() -> list: """AI 聚合用:含未启用账户(标记未监控)。""" return list(load_settings().get("exchanges") or []) from hub_ai.routes import create_hub_ai_router app.include_router(create_hub_ai_router(load_all_exchanges=_all_exchanges_for_ai)) @app.get("/trade") def trade_removed_redirect(): from fastapi.responses import RedirectResponse return RedirectResponse("/monitor", status_code=302) @app.get("/api/settings") def api_get_settings(): return load_settings() class SettingsBody(BaseModel): exchanges: list[dict] = Field(default_factory=list) @app.post("/api/settings") def api_save_settings(body: SettingsBody): force_off = env_force_disabled_ids() to_save = [] for ex in body.exchanges: row = dict(ex) eid = str(row.get("id", "")).strip() if eid in force_off: row["enabled"] = False row.pop("env_disabled", None) to_save.append(row) save_settings({"version": 1, "exchanges": to_save}) return {"ok": True, "settings": load_settings()} def _find_exchange_by_key(exchange_key: str) -> dict | None: key = (exchange_key or "").strip().lower() if not key: return None for ex in load_settings().get("exchanges") or []: if str(ex.get("key") or "").strip().lower() == key: return ex if str(ex.get("id") or "").strip() == exchange_key.strip(): return ex return None def _fetch_instance_trades_archive_sync( ex: dict, *, days: int = 365, limit: int = 2000, ) -> dict: base = (ex.get("flask_url") or "").rstrip("/") if not base: return {"ok": False, "msg": "未配置 flask_url"} params = {"days": str(int(days)), "limit": str(int(limit))} url = f"{base}/api/hub/trades/archive?{urlencode(params)}" try: with httpx.Client(timeout=HUB_FLASK_TIMEOUT) as client: r = client.get(url, headers=_hub_headers()) if r.status_code >= 400: parsed = _parse_http_json_body(r) parsed.setdefault("ok", False) parsed.setdefault("status", r.status_code) return parsed data = r.json() if r.content else {} if isinstance(data, dict): data.setdefault("ok", True) return data return {"ok": False, "msg": "无效 JSON"} except Exception as e: return {"ok": False, "msg": str(e)} def _fetch_instance_ohlcv_sync( ex: dict, *, symbol: str, timeframe: str, since_ms: int | None, limit: int, ) -> dict: base = (ex.get("flask_url") or "").rstrip("/") if not base: return {"ok": False, "msg": "未配置 flask_url"} params = {"symbol": symbol, "timeframe": timeframe, "limit": str(int(limit))} if since_ms is not None and int(since_ms) > 0: params["since_ms"] = str(int(since_ms)) url = f"{base}/api/hub/ohlcv?{urlencode(params)}" try: with httpx.Client(timeout=HUB_FLASK_TIMEOUT) as client: r = client.get(url, headers=_hub_headers()) if r.status_code >= 400: parsed = _parse_http_json_body(r) parsed.setdefault("ok", False) return parsed data = r.json() if r.content else {} return data if isinstance(data, dict) else {"ok": False, "msg": "无效 JSON"} except Exception as e: return {"ok": False, "msg": str(e)} @app.get("/api/chart/meta") def api_chart_meta(): tfs = [tf for tf in CHART_TIMEFRAME_ORDER if tf in CHART_TIMEFRAMES] exchanges = [] for ex in enabled_exchanges(load_settings()): exchanges.append( { "id": ex.get("id"), "key": ex.get("key"), "name": ex.get("name"), } ) return { "ok": True, "timeframes": [tf for tf in tfs if tf in CHART_TIMEFRAMES], "retention_days": retention_days(), "retention_policy": retention_policy_meta(), "limits": {tf: bar_limit_for_timeframe(tf) for tf in tfs if tf in CHART_TIMEFRAMES}, "initial_limits": {tf: chart_initial_limit(tf) for tf in tfs if tf in CHART_TIMEFRAMES}, "chunk_limits": {tf: chart_chunk_limit(tf) for tf in tfs if tf in CHART_TIMEFRAMES}, "memory_caps": {tf: chart_memory_cap(tf) for tf in tfs if tf in CHART_TIMEFRAMES}, "exchanges": exchanges, "volume_rank_top_n": TOP_N_DEFAULT, "volume_rank_reset_hour": volume_rank_reset_hour(), } @app.get("/api/chart/volume-rank") def api_chart_volume_rank(exchange_key: str = "", refresh: str = ""): force = (refresh or "").strip().lower() in ("1", "true", "yes", "on") if force: result = _refresh_volume_ranks(force=True) if not result.get("ok"): raise HTTPException(status_code=502, detail=result.get("msg") or "刷新失败") cache = _get_volume_rank_cache() ex_k = (exchange_key or "").strip().lower() targets = enabled_exchanges(load_settings()) required_keys = [ str(ex.get("key") or "").strip().lower() for ex in targets if ex.get("enabled") and str(ex.get("key") or "").strip() ] need_keys = [ex_k] if ex_k else required_keys if cache_needs_refresh(cache, required_keys=need_keys): _refresh_volume_ranks(force=True) cache = _get_volume_rank_cache() elif ex_k: row = (cache.get("exchanges") or {}).get(ex_k) or {} if _exchange_rank_row_stale(row): _refresh_volume_ranks(force=True) cache = _get_volume_rank_cache() if ex_k: ex = _find_exchange_by_key(ex_k) if not ex: raise HTTPException(status_code=400, detail="交易所不存在") payload = get_cached_rank(cache, ex_k, top_n=TOP_N_DEFAULT) payload["items"] = [ {**row, "volume_label": format_volume_quote(row.get("volume_quote"))} for row in payload.get("items") or [] ] payload["reset_hour"] = volume_rank_reset_hour() err = ((cache.get("exchanges") or {}).get(ex_k) or {}).get("error") if err and not payload.get("items"): payload["ok"] = False payload["msg"] = err return payload exchanges_out = {} for ex in enabled_exchanges(load_settings()): key = str(ex.get("key") or "").strip().lower() if not key: continue row = get_cached_rank(cache, key, top_n=TOP_N_DEFAULT) row["name"] = ex.get("name") row["items"] = [ {**item, "volume_label": format_volume_quote(item.get("volume_quote"))} for item in row.get("items") or [] ] exchanges_out[key] = row return { "ok": True, "rank_date": cache.get("rank_date"), "updated_at": cache.get("updated_at"), "reset_hour": volume_rank_reset_hour(), "exchanges": exchanges_out, } @app.post("/api/chart/volume-rank/refresh") async def api_chart_volume_rank_refresh(): result = await asyncio.to_thread(_refresh_volume_ranks, force=True) if not result.get("ok"): raise HTTPException(status_code=502, detail=result.get("msg") or "刷新失败") return result @app.get("/api/chart/ohlcv") def api_chart_ohlcv( exchange_key: str = "", symbol: str = "", timeframe: str = "1d", refresh: str = "", tail: str = "", limit: int = 0, before_ms: str = "", ): ex = _find_exchange_by_key(exchange_key) if not ex: raise HTTPException(status_code=400, detail="交易所不存在") if not ex.get("enabled"): raise HTTPException(status_code=400, detail="该交易所未启用") sym = (symbol or "").strip().upper() if not sym: raise HTTPException(status_code=400, detail="请输入币种") ex_key = str(ex.get("key") or "").strip().lower() force = (refresh or "").strip().lower() in ("1", "true", "yes", "on") tail_refresh = (tail or "").strip().lower() in ("1", "true", "yes", "on") lim = int(limit) if int(limit or 0) > 0 else None bms_raw = (before_ms or "").strip() bms = None if bms_raw: try: bms = int(bms_raw) except ValueError: raise HTTPException(status_code=400, detail="before_ms 无效") clear_db = force and not tail_refresh and bms is None def remote_fetch(**kwargs): tf_use = kwargs.get("timeframe") or timeframe return _fetch_instance_ohlcv_sync( ex, symbol=kwargs.get("symbol") or sym, timeframe=tf_use, since_ms=kwargs.get("since_ms"), limit=int(kwargs.get("limit") or bar_limit_for_timeframe(tf_use)), ) result = resolve_chart_bars( ex_key, sym, timeframe, remote_fetch, force_refresh=force, tail_refresh=tail_refresh, clear_db=clear_db, limit=lim, before_ms=bms, ) if not result.get("ok"): raise HTTPException(status_code=502, detail=result.get("msg") or "K线加载失败") if not result.get("candles") and result.get("before_ms") is None: raise HTTPException(status_code=502, detail=result.get("msg") or "无 K 线") tick = result.get("price_tick") last = result["candles"][-1] if result.get("candles") else None result["ohlcv"] = format_ohlcv_detail( { "open": last.get("open") if last else None, "high": last.get("high") if last else None, "low": last.get("low") if last else None, "close": last.get("close") if last else None, "volume": last.get("volume") if last else None, } if last else None, tick, ) result["chart_version"] = chart_poll_store.version result["series_version"] = chart_poll_store.series_version(ex_key, sym, timeframe) result["chart_poll_interval_sec"] = HUB_CHART_POLL_INTERVAL return result class ChartWatchBody(BaseModel): exchange_key: str = "" symbol: str = "" timeframe: str = "5m" @app.post("/api/chart/watch") async def api_chart_watch(body: ChartWatchBody = Body(...)): ex_k = (body.exchange_key or "").strip().lower() sym = (body.symbol or "").strip().upper() tf = (body.timeframe or "5m").strip() if not ex_k or not sym: raise HTTPException(status_code=400, detail="缺少 exchange_key 或 symbol") if tf not in CHART_TIMEFRAMES: raise HTTPException(status_code=400, detail="不支持的周期") key = chart_poll_store.touch_watch(ex_k, sym, tf) chart_poll_store.request_refresh() return { "ok": True, "series_key": key, "series_version": chart_poll_store.series_version(ex_k, sym, tf), "chart_version": chart_poll_store.version, "watch_ttl_sec": HUB_CHART_WATCH_TTL_SEC, } @app.post("/api/chart/unwatch") async def api_chart_unwatch(body: ChartWatchBody = Body(...)): chart_poll_store.clear_watch(body.exchange_key, body.symbol, body.timeframe) return {"ok": True} @app.get("/api/chart/stream") async def api_chart_stream(): from fastapi.responses import StreamingResponse return StreamingResponse( chart_poll_store.iter_sse(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) @app.get("/api/chart/poll/meta") async def api_chart_poll_meta(): return chart_poll_store.event_dict() @app.get("/api/settings/meta") def api_settings_meta(): po = public_origin() return { "env_disabled_ids": sorted(env_force_disabled_ids()), "hub_bridge_token_set": bool(HUB_BRIDGE_TOKEN), "capability_options": ["key", "trend"], "public_origin": f"{po[0]}://{po[1]}" if po else None, "public_origin_hint": ( "未设置 HUB_PUBLIC_ORIGIN 时,复盘链接若为 127.0.0.1,仅服务器本机浏览器可打开" if not po else "复盘/展示链接已替换为对外地址" ), "password_required": password_required(), } async def _fetch_agent_status(client: httpx.AsyncClient, ex: dict) -> dict: url = f"{ex['agent_url'].rstrip('/')}/status" try: r = await client.get(url, headers=_agent_headers(), timeout=HUB_AGENT_TIMEOUT) body = r.json() if r.content else {} return { "id": ex["id"], "name": ex["name"], "key": ex.get("key"), "agent_url": ex["agent_url"], "flask_url": ex.get("flask_url"), "capabilities": ex.get("capabilities") or [], "http_ok": r.status_code == 200, "agent": body, "error": body.get("error") if isinstance(body, dict) else None, } except Exception as e: return { "id": ex["id"], "name": ex["name"], "key": ex.get("key"), "agent_url": ex["agent_url"], "flask_url": ex.get("flask_url"), "capabilities": ex.get("capabilities") or [], "http_ok": False, "error": str(e), "agent": None, } def _parse_http_json_body(r: httpx.Response) -> dict: text = (r.text or "").strip() if not text: return {"ok": False, "status": r.status_code, "text": "(empty body)"} try: data = r.json() if isinstance(data, dict): return data return {"ok": False, "status": r.status_code, "text": text[:500]} except Exception: snippet = text[:500] if snippet.lstrip().lower().startswith(" dict | None: base = (ex.get("flask_url") or "").rstrip("/") if not base: return None try: if method == "GET": r = await client.get(f"{base}{path}", headers=_hub_headers(), timeout=HUB_FLASK_TIMEOUT) else: headers = {**_hub_headers(), "Content-Type": "application/json"} if json_body is not None: r = await client.post( f"{base}{path}", headers=headers, json=json_body, timeout=120.0 ) else: r = await client.post( f"{base}{path}", headers=headers, data=data, timeout=120.0 ) if r.status_code >= 400: parsed = _parse_http_json_body(r) parsed.setdefault("ok", False) parsed.setdefault("status", r.status_code) return parsed return _parse_http_json_body(r) except Exception as e: return {"ok": False, "error": str(e)} def _flask_error_from_hub_mon(hub_mon: dict | None) -> str | None: if not isinstance(hub_mon, dict) or hub_mon.get("ok") is not False: return None st = hub_mon.get("status") if st == 404: return ( "HTTP 404:该 Flask 未注册 /api/hub/*(hub_bridge 未加载)。" "请在仓库根目录 git pull 后 pm2 restart crypto_binance crypto_gate crypto_gate_bot," "并查看启动日志是否含 [hub_bridge] ImportError" ) return ( hub_mon.get("msg") or hub_mon.get("error") or (f"HTTP {st}" if st else None) or (str(hub_mon.get("text") or "")[:120] or None) ) def _cond_order_trigger_key(price: object) -> str | None: if price is None or price == "": return None try: return f"{float(price):.12g}" except (TypeError, ValueError): return None def _merge_conditional_orders_no_dup( existing: list, extra: list ) -> list: """子代理已拉到的条件单与 Flask exchange_tpsl 合成行按触发价/订单号去重,避免 Gate 显示 4 笔实为 2 笔。""" if not extra: return list(existing) if existing else [] if not existing: return list(extra) triggers: set[str] = set() order_ids: set[str] = set() out: list = [] for row in existing: if not isinstance(row, dict): continue out.append(row) k = _cond_order_trigger_key(row.get("trigger_price")) if k: triggers.add(k) oid = row.get("id") if oid not in (None, ""): order_ids.add(str(oid)) for row in extra: if not isinstance(row, dict): continue k = _cond_order_trigger_key(row.get("trigger_price")) oid = row.get("id") if k and k in triggers: continue if oid not in (None, "") and str(oid) in order_ids: continue out.append(row) if k: triggers.add(k) if oid not in (None, ""): order_ids.add(str(oid)) return out def _tpsl_slots_to_conditional_orders(exchange_tpsl: dict, symbol: str) -> list[dict]: """将实例 price_snapshot 的 exchange_tpsl 转为中控条件单结构。""" out: list[dict] = [] if not isinstance(exchange_tpsl, dict): return out for role, label in (("sl", "止损"), ("tp", "止盈")): slot = exchange_tpsl.get(role) if not isinstance(slot, dict): continue trig = slot.get("trigger_price") if trig is None: continue try: trig_f = float(trig) except (TypeError, ValueError): continue oid = slot.get("order_id") out.append( { "id": str(oid) if oid is not None else "", "symbol": symbol, "channel": "algo", "category": "conditional", "label": f"{label} {trig_f:g}", "trigger_price": trig_f, "amount": slot.get("amount"), "status": "open", } ) return out def _exchange_tpsl_from_hub_order(hub_orders: list, symbol: str, side: str) -> dict | None: """趋势保本移交后:用下单监控计划价补全 exchange_tpsl(与实例页一致)。""" side_l = (side or "").lower() for o in hub_orders: if not isinstance(o, dict): continue o_sym = o.get("exchange_symbol") or o.get("symbol") or "" if not _symbols_match(symbol, o_sym): continue if (o.get("direction") or "").lower() != side_l: continue sl = o.get("stop_loss") tp = o.get("take_profit") if sl in (None, "") and tp in (None, ""): continue slots: dict = {"sl": None, "tp": None} if sl not in (None, ""): try: slots["sl"] = {"trigger_price": float(sl), "order_id": None} except (TypeError, ValueError): pass if tp not in (None, ""): try: slots["tp"] = {"trigger_price": float(tp), "order_id": None} except (TypeError, ValueError): pass if slots["sl"] or slots["tp"]: return slots return None def _find_exchange_tpsl_for_position( symbol: str, side: str, order_prices: list, hub_orders: list, ) -> dict | None: side_l = (side or "").lower() op_by_id = { op.get("id"): op for op in order_prices if isinstance(op, dict) and op.get("id") is not None } for o in hub_orders: if not isinstance(o, dict): continue o_sym = o.get("exchange_symbol") or o.get("symbol") or "" if not _symbols_match(symbol, o_sym): continue if (o.get("direction") or "").lower() != side_l: continue op = op_by_id.get(o.get("id")) if not isinstance(op, dict): continue et = op.get("exchange_tpsl") if isinstance(et, dict) and (et.get("sl") or et.get("tp")): return et for op in order_prices: if not isinstance(op, dict): continue if not _symbols_match(symbol, op.get("symbol") or ""): continue et = op.get("exchange_tpsl") if isinstance(et, dict) and (et.get("sl") or et.get("tp")): return et return None def _merge_flask_order_price_fields(hub_mon: dict | None, snap: dict | None) -> None: """将 price_snapshot 中的快照盈亏比、已保本状态合并进 hub_monitor.orders。""" if not isinstance(hub_mon, dict) or not isinstance(snap, dict): return order_prices = snap.get("order_prices") or [] op_by_id = { op.get("id"): op for op in order_prices if isinstance(op, dict) and op.get("id") is not None } orders = hub_mon.get("orders") or [] if not isinstance(orders, list): return for o in orders: if not isinstance(o, dict): continue op = op_by_id.get(o.get("id")) if not isinstance(op, dict): continue if op.get("rr_ratio") is not None: o["rr_ratio"] = op["rr_ratio"] if "sl_breakeven_secured" in op: o["sl_breakeven_secured"] = bool(op["sl_breakeven_secured"]) for key in ( "stop_loss", "take_profit", "stop_loss_display", "take_profit_display", "display_rr_ratio", ): if key in op and op[key] not in (None, ""): o[key] = op[key] def _merge_flask_position_breakeven(agent_row: dict, snap: dict | None, hub_mon: dict | None) -> None: """将 price_snapshot 的已保本状态同步到 agent 持仓,供中控首页表格展示。""" ag = agent_row.get("agent") if not isinstance(ag, dict) or not isinstance(snap, dict): return positions = ag.get("positions") if not isinstance(positions, list) or not positions: return order_prices = snap.get("order_prices") or [] hub_orders = [] if isinstance(hub_mon, dict): hub_orders = hub_mon.get("orders") or [] op_by_id = { op.get("id"): op for op in order_prices if isinstance(op, dict) and op.get("id") is not None } for p in positions: if not isinstance(p, dict): continue sym = p.get("symbol") or "" side = (p.get("side") or "").lower() matched = None for o in hub_orders: if not isinstance(o, dict): continue o_sym = o.get("exchange_symbol") or o.get("symbol") or "" if not _symbols_match(sym, o_sym): continue if (o.get("direction") or "").lower() != side: continue matched = op_by_id.get(o.get("id")) break if matched is None: for op in order_prices: if not isinstance(op, dict): continue if not _symbols_match(sym, op.get("symbol") or ""): continue matched = op break if isinstance(matched, dict) and "sl_breakeven_secured" in matched: p["sl_breakeven_secured"] = bool(matched["sl_breakeven_secured"]) def _agent_position_has_mark(p: dict) -> bool: try: v = float(p.get("mark_price")) return v > 0 except (TypeError, ValueError): return False def _apply_agent_mark_price(p: dict, mark_price: object, mark_display: object = None) -> None: try: mpf = float(mark_price) except (TypeError, ValueError): return if mpf <= 0: return p["mark_price"] = mpf disp = mark_display if disp is not None and str(disp).strip() not in ("", "-"): p["mark_price_fmt"] = str(disp) def _find_matched_order_price_op( p: dict, order_prices: list, hub_orders: list, op_by_id: dict, ) -> dict | None: sym = p.get("symbol") or "" side = (p.get("side") or "").lower() for o in hub_orders: if not isinstance(o, dict): continue o_sym = o.get("exchange_symbol") or o.get("symbol") or "" if not _symbols_match(sym, o_sym): continue if (o.get("direction") or "").lower() != side: continue matched = op_by_id.get(o.get("id")) if isinstance(matched, dict): return matched break for op in order_prices: if not isinstance(op, dict): continue if not _symbols_match(sym, op.get("symbol") or ""): continue return op return None def _merge_flask_position_mark_price( agent_row: dict, snap: dict | None, hub_mon: dict | None ) -> None: """子代理无标记价时,用实例 price_snapshot 的交易所标记价补全中控持仓展示。""" ag = agent_row.get("agent") if not isinstance(ag, dict) or not isinstance(snap, dict): return positions = ag.get("positions") if not isinstance(positions, list) or not positions: return order_prices = snap.get("order_prices") or [] hub_orders = [] if isinstance(hub_mon, dict): hub_orders = hub_mon.get("orders") or [] op_by_id = { op.get("id"): op for op in order_prices if isinstance(op, dict) and op.get("id") is not None } for p in positions: if not isinstance(p, dict) or _agent_position_has_mark(p): continue matched = _find_matched_order_price_op(p, order_prices, hub_orders, op_by_id) if isinstance(matched, dict): _apply_agent_mark_price( p, matched.get("exchange_mark_price"), matched.get("exchange_mark_price_display"), ) position_marks = snap.get("position_marks") or [] if not isinstance(position_marks, list): return for p in positions: if not isinstance(p, dict) or _agent_position_has_mark(p): continue sym = p.get("symbol") or "" side = (p.get("side") or "").lower() for pm in position_marks: if not isinstance(pm, dict): continue if not _symbols_match(sym, pm.get("symbol") or ""): continue if (pm.get("side") or "").lower() != side: continue _apply_agent_mark_price( p, pm.get("mark_price"), pm.get("mark_price_display") ) break def _merge_flask_exchange_tpsl(agent_row: dict, snap: dict | None, hub_mon: dict | None) -> None: """子代理挂单为空时,用实例 Flask 已算好的 exchange_tpsl 补全展示。""" ag = agent_row.get("agent") if not isinstance(ag, dict): return positions = ag.get("positions") if not isinstance(positions, list) or not positions: return if not isinstance(snap, dict): return order_prices = snap.get("order_prices") or [] hub_orders = [] if isinstance(hub_mon, dict): hub_orders = hub_mon.get("orders") or [] for p in positions: if not isinstance(p, dict): continue sym = p.get("symbol") or "" side = p.get("side") or "" et = _find_exchange_tpsl_for_position(sym, side, order_prices, hub_orders) if not et: et = _exchange_tpsl_from_hub_order(hub_orders, sym, side) if not et: continue p["exchange_tpsl"] = et cond = p.get("conditional_orders") or [] merged = _tpsl_slots_to_conditional_orders(et, sym) p["conditional_orders"] = _merge_conditional_orders_no_dup(cond, merged) async def _fetch_exchange_flask_bundle( client: httpx.AsyncClient, ex: dict ) -> tuple[dict | None, dict | None, list | None, dict | None, dict | None]: """单所 Flask:monitor / meta / price_snapshot / account(有 flask_url 时)并行拉取。""" caps = ex.get("capabilities") or [] tasks = [ _fetch_flask_json(client, ex, "/api/hub/monitor"), _fetch_flask_json(client, ex, "/api/hub/meta"), ] has_flask = bool((ex.get("flask_url") or "").strip()) if has_flask: tasks.extend( [ _fetch_flask_json(client, ex, "/api/price_snapshot"), _fetch_flask_json(client, ex, "/api/hub/account"), ] ) results = await asyncio.gather(*tasks) hub_mon = results[0] meta = results[1] snap = results[2] if has_flask and len(results) > 2 else None account = results[3] if has_flask and len(results) > 3 else None key_prices = None want_prices = HUB_BOARD_KEY_PRICES and "key" in caps if want_prices and isinstance(snap, dict): key_prices = snap.get("key_prices") return ( hub_mon, meta, key_prices, snap if isinstance(snap, dict) else None, account if isinstance(account, dict) else None, ) async def _assemble_board_row( client: httpx.AsyncClient, ex: dict, agent_row: dict ) -> dict: hub_mon, meta, key_prices, snap, account = await _fetch_exchange_flask_bundle( client, ex ) if isinstance(hub_mon, dict): _merge_flask_order_price_fields(hub_mon, snap) _merge_flask_exchange_tpsl(agent_row, snap, hub_mon if isinstance(hub_mon, dict) else None) _merge_flask_position_breakeven(agent_row, snap, hub_mon if isinstance(hub_mon, dict) else None) _merge_flask_position_mark_price(agent_row, snap, hub_mon if isinstance(hub_mon, dict) else None) flask_ok = isinstance(hub_mon, dict) and hub_mon.get("ok") is not False acct_ok = isinstance(account, dict) and account.get("ok") is not False raw_review = (ex.get("review_url") or "").strip() review_link = browser_url(raw_review) if raw_review else default_review_url( ex.get("flask_url") ) return { **agent_row, "flask_url": ex.get("flask_url") or "", "flask_url_browser": browser_url(ex.get("flask_url")), "review_url": review_link, "hub_monitor": hub_mon, "flask_ok": flask_ok, "flask_error": _flask_error_from_hub_mon(hub_mon if isinstance(hub_mon, dict) else None), "meta": (meta or {}).get("meta") if isinstance(meta, dict) else meta, "key_prices": key_prices, "funding_usdt": account.get("funding_usdt") if acct_ok else None, "trading_usdt": account.get("trading_usdt") if acct_ok else None, "available_trading_usdt": account.get("available_trading_usdt") if acct_ok else None, "account_ok": acct_ok, } async def _build_monitor_board_payload() -> dict: exchanges = enabled_exchanges() async with httpx.AsyncClient() as client: agent_rows = await asyncio.gather( *[_fetch_agent_status(client, ex) for ex in exchanges] ) out = await asyncio.gather( *[ _assemble_board_row(client, ex, agent_row) for ex, agent_row in zip(exchanges, agent_rows) ] ) return { "rows": list(out), "updated_at": __import__("datetime").datetime.now().isoformat(timespec="seconds"), } @app.get("/api/monitor/board") @app.get("/api/monitor/board/snapshot") async def api_monitor_board_snapshot(): """读后台缓存快照;完整聚合由 hub 每 HUB_BOARD_POLL_INTERVAL 秒执行。""" return board_store.snapshot_dict() @app.get("/api/monitor/board/stream") async def api_monitor_board_stream(): from fastapi.responses import StreamingResponse return StreamingResponse( board_store.iter_sse(), media_type="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) @app.post("/api/monitor/board/refresh") async def api_monitor_board_refresh(): _schedule_board_refresh() return {"ok": True, "board_version": board_store.version} def _require_hub_logged_in(request: Request) -> None: if password_required() and not validate_session_token(request.cookies.get(SESSION_COOKIE)): raise HTTPException(status_code=401, detail="未登录中控") @app.get("/api/instance/open-url") def api_instance_open_url( request: Request, exchange_id: str, next: str = "/", embed: str = "", hub_theme: str = "", ): """已登录中控时生成实例 SSO 打开链接(2h 有效、单次使用,复用 HUB_BRIDGE_TOKEN)。""" _require_hub_logged_in(request) if not HUB_BRIDGE_TOKEN: raise HTTPException(status_code=503, detail="未配置 HUB_BRIDGE_TOKEN,无法签发实例打开链接") ex = _find_exchange(exchange_id) if not ex: raise HTTPException(status_code=404, detail="未知交易所 id") base = browser_url((ex.get("flask_url") or "").strip()).rstrip("/") if not base: raise HTTPException(status_code=400, detail="该账户未配置 flask_url") ex_key = (ex.get("key") or "").strip().lower() if not ex_key: raise HTTPException(status_code=400, detail="该账户缺少 key(用于 SSO 校验)") nxt = safe_next_path(next) token = mint_hub_sso_token(ex_key, nxt) if not token: raise HTTPException(status_code=503, detail="签发 SSO 失败") params = {"token": token, "next": nxt} if (embed or "").strip().lower() in ("1", "true", "yes", "on"): params["embed"] = "1" ht = (hub_theme or "").strip().lower() if ht in ("light", "dark"): params["hub_theme"] = ht q = urlencode(params) return { "ok": True, "url": f"{base}/hub-sso?{q}", "expires_in": HUB_SSO_TTL_SEC, "exchange_id": exchange_id, "exchange_key": ex_key, } class CloseAllBody(BaseModel): exclude_ids: list[str] = Field(default_factory=list) class ClosePositionBody(BaseModel): symbol: str side: str class CancelOrderBody(BaseModel): symbol: str order_id: str channel: str = "regular" class CancelSymbolOrdersBody(BaseModel): symbol: str scope: str = "all" class PlaceTpslBody(BaseModel): symbol: str side: str stop_loss: float take_profit: float contracts: float | None = None class TrendPlanActionBody(BaseModel): plan_id: int breakeven_offset_pct: float | None = None def _flask_hub_messages(parsed: dict | None) -> tuple[bool, str]: if not isinstance(parsed, dict): return False, "实例返回无效" msgs = list(parsed.get("messages") or []) if parsed.get("msg"): msgs.insert(0, str(parsed["msg"])) if parsed.get("error"): msgs.append(str(parsed["error"])) ok = parsed.get("ok") is not False if parsed.get("ok") is True: ok = True elif parsed.get("ok") is False: ok = False else: for m in msgs: if any( k in str(m) for k in ("失败", "错误", "无法", "缺少", "过期", "未找到", "不允许", "异常") ): ok = False break text = ";".join(str(x) for x in msgs if x) or ("成功" if ok else "操作失败") return ok, text @app.post("/api/trend/{exchange_id}/stop") async def api_trend_plan_stop(exchange_id: str, body: TrendPlanActionBody): ex = _find_exchange(exchange_id) if not ex or not ex.get("enabled"): raise HTTPException(status_code=404, detail="账户未启用") if "trend" not in (ex.get("capabilities") or []): raise HTTPException(status_code=400, detail="该账户未启用趋势计划监控") pid = int(body.plan_id) async with httpx.AsyncClient() as client: parsed = await _fetch_flask_json( client, ex, f"/api/hub/trend/stop/{pid}", method="POST" ) ok, text = _flask_hub_messages(parsed) _schedule_board_refresh() return {"ok": ok, "message": text, "payload": parsed} @app.post("/api/trend/{exchange_id}/breakeven") async def api_trend_plan_breakeven(exchange_id: str, body: TrendPlanActionBody): ex = _find_exchange(exchange_id) if not ex or not ex.get("enabled"): raise HTTPException(status_code=404, detail="账户未启用") if "trend" not in (ex.get("capabilities") or []): raise HTTPException(status_code=400, detail="该账户未启用趋势计划监控") pid = int(body.plan_id) data = {} if body.breakeven_offset_pct is not None: data["breakeven_offset_pct"] = str(body.breakeven_offset_pct) async with httpx.AsyncClient() as client: parsed = await _fetch_flask_json( client, ex, f"/api/hub/trend/breakeven/{pid}", method="POST", data=data, ) ok, text = _flask_hub_messages(parsed) _schedule_board_refresh() return {"ok": ok, "message": text, "payload": parsed} @app.post("/api/orders/{exchange_id}/cancel") async def api_cancel_order(exchange_id: str, body: CancelOrderBody): ex = _find_exchange(exchange_id) if not ex or not ex.get("enabled"): raise HTTPException(status_code=404, detail="账户未启用") url = f"{ex['agent_url'].rstrip('/')}/orders/cancel" async with httpx.AsyncClient() as client: r = await client.post( url, headers=_agent_headers(), json={ "symbol": body.symbol, "order_id": body.order_id, "channel": body.channel or "regular", }, timeout=60.0, ) try: payload = r.json() except Exception: payload = {"raw": (r.text or "")[:2000]} out = { "exchange": ex, "status_code": r.status_code, "payload": payload, "ok": bool(isinstance(payload, dict) and payload.get("ok")), } _schedule_board_refresh() return out @app.post("/api/orders/{exchange_id}/cancel-symbol") async def api_cancel_symbol_orders(exchange_id: str, body: CancelSymbolOrdersBody): ex = _find_exchange(exchange_id) if not ex or not ex.get("enabled"): raise HTTPException(status_code=404, detail="账户未启用") url = f"{ex['agent_url'].rstrip('/')}/orders/cancel-symbol" async with httpx.AsyncClient() as client: r = await client.post( url, headers=_agent_headers(), json={"symbol": body.symbol, "scope": body.scope or "all"}, timeout=120.0, ) try: payload = r.json() except Exception: payload = {"raw": (r.text or "")[:2000]} out = { "exchange": ex, "status_code": r.status_code, "payload": payload, "ok": bool(isinstance(payload, dict) and payload.get("ok")), } _schedule_board_refresh() return out @app.post("/api/close/{exchange_id}/position") async def api_close_position(exchange_id: str, body: ClosePositionBody): ex = _find_exchange(exchange_id) if not ex or not ex.get("enabled"): raise HTTPException(status_code=404, detail="账户未启用") sym = (body.symbol or "").strip() side = (body.side or "").strip().lower() if not sym: raise HTTPException(status_code=400, detail="symbol 不能为空") if side not in ("long", "short"): raise HTTPException(status_code=400, detail="side 须为 long 或 short") url = f"{ex['agent_url'].rstrip('/')}/emergency/close-position" async with httpx.AsyncClient() as client: r = await client.post( url, headers=_agent_headers(), json={"symbol": sym, "side": side}, timeout=120.0, ) try: payload = r.json() except Exception: payload = {"raw": (r.text or "")[:2000]} out = { "exchange": ex, "status_code": r.status_code, "payload": payload, "ok": bool(isinstance(payload, dict) and payload.get("ok")), } if out.get("ok") and "trend" in (ex.get("capabilities") or []): async with httpx.AsyncClient() as flask_client: sync_parsed = await _fetch_flask_json( flask_client, ex, "/api/hub/trend/sync-flat", method="POST", json_body={"symbol": sym, "side": side}, ) if isinstance(sync_parsed, dict): out["trend_sync"] = sync_parsed _schedule_board_refresh() return out @app.post("/api/orders/{exchange_id}/place-tpsl") async def api_place_tpsl(exchange_id: str, body: PlaceTpslBody): ex = _find_exchange(exchange_id) if not ex or not ex.get("enabled"): raise HTTPException(status_code=404, detail="账户未启用") url = f"{ex['agent_url'].rstrip('/')}/orders/place-tpsl" async with httpx.AsyncClient() as client: r = await client.post( url, headers=_agent_headers(), json={ "symbol": body.symbol, "side": body.side, "stop_loss": body.stop_loss, "take_profit": body.take_profit, "contracts": body.contracts, }, timeout=120.0, ) try: payload = r.json() except Exception: payload = {"raw": (r.text or "")[:2000]} out = { "exchange": ex, "status_code": r.status_code, "payload": payload, "ok": bool(isinstance(payload, dict) and payload.get("ok")), } _schedule_board_refresh() return out @app.post("/api/close/{exchange_id}") async def api_close_exchange(exchange_id: str): ex = _find_exchange(exchange_id) if not ex or not ex.get("enabled"): raise HTTPException(status_code=404, detail="账户未启用") url = f"{ex['agent_url'].rstrip('/')}/emergency/close-all" async with httpx.AsyncClient() as client: r = await client.post(url, headers=_agent_headers(), timeout=120.0) try: body = r.json() except Exception: body = {"raw": (r.text or "")[:2000]} out = {"exchange": ex, "status_code": r.status_code, "payload": body} _schedule_board_refresh() return out @app.post("/api/close-all") async def api_close_all(body: CloseAllBody | None = Body(default=None)): excl = set(body.exclude_ids if body else []) excl |= env_force_disabled_ids() targets = [x for x in enabled_exchanges() if str(x["id"]) not in excl] async with httpx.AsyncClient() as client: async def one(ex: dict): url = f"{ex['agent_url'].rstrip('/')}/emergency/close-all" try: r = await client.post(url, headers=_agent_headers(), timeout=120.0) try: payload = r.json() except Exception: payload = {"raw": (r.text or "")[:2000]} return {"id": ex["id"], "name": ex["name"], "status_code": r.status_code, "payload": payload} except Exception as e: return {"id": ex["id"], "name": ex["name"], "status_code": None, "error": str(e)} results = await asyncio.gather(*[one(ex) for ex in targets]) _schedule_board_refresh() return {"results": list(results)} def _trade_removed_response(): """旧版前端或缓存页面仍会请求 /api/trade/*,勿解析表单,直接返回说明。""" return JSONResponse( { "ok": False, "result": { "ok": False, "messages": [ "中控已移除下单区。请在监控卡片点击「实例」," "进入对应 crypto_monitor_* 网页添加关键位或下单。" ], }, "deprecated": True, }, status_code=410, ) def _parse_anchor_ms(at: str = "", anchor_ms: str = "") -> int | None: raw = (anchor_ms or at or "").strip() if not raw: return None return parse_wall_clock_ms(raw) @app.get("/api/archive/meta") def api_archive_meta(): init_archive_db() exchanges = [] for ex in enabled_exchanges(load_settings()): exchanges.append( { "id": ex.get("id"), "key": ex.get("key"), "name": ex.get("name"), } ) return { "ok": True, "timeframes": sorted(ARCHIVE_TIMEFRAMES), "default_timeframe": ARCHIVE_DEFAULT_TIMEFRAME, "seed_lookback_days": ARCHIVE_SEED_LOOKBACK_DAYS, "sync_interval_sec": ARCHIVE_SYNC_INTERVAL_SEC, "visible_bars_default": ARCHIVE_VISIBLE_BARS_DEFAULT, "exchanges": exchanges, "last_sync": _last_archive_sync, } @app.get("/api/archive/list") def api_archive_list( exchange_key: str = "", filter_profit: str = "", filter_loss: str = "", filter_sick: str = "", filter_emotion: str = "", ): init_archive_db() rows = list_symbol_rows( exchange_key=exchange_key, filter_profit=(filter_profit or "").lower() in ("1", "true", "yes", "on"), filter_loss=(filter_loss or "").lower() in ("1", "true", "yes", "on"), filter_sick=(filter_sick or "").lower() in ("1", "true", "yes", "on"), filter_emotion=(filter_emotion or "").lower() in ("1", "true", "yes", "on"), ) return {"ok": True, "rows": rows, "count": len(rows)} @app.get("/api/archive/detail") def api_archive_detail(exchange_key: str = "", symbol: str = ""): ex_k = (exchange_key or "").strip().lower() sym = (symbol or "").strip().upper() if not ex_k or not sym: raise HTTPException(status_code=400, detail="缺少 exchange_key 或 symbol") init_archive_db() trades = load_symbol_trades(ex_k, sym) return {"ok": True, "exchange_key": ex_k, "symbol": sym, "trades": trades} @app.get("/api/archive/ohlcv") def api_archive_ohlcv( exchange_key: str = "", symbol: str = "", timeframe: str = ARCHIVE_DEFAULT_TIMEFRAME, mode: str = "hold", anchor_ms: str = "", opened_ms: str = "", closed_ms: str = "", range: str = "", at: str = "", bars: str = "", ): ex_k = (exchange_key or "").strip().lower() sym = (symbol or "").strip().upper() if not ex_k or not sym: raise HTTPException(status_code=400, detail="缺少 exchange_key 或 symbol") init_archive_db() anchor = _parse_anchor_ms(at, anchor_ms) open_ms = _parse_anchor_ms("", opened_ms) close_ms = _parse_anchor_ms("", closed_ms) try: bar_n = int(bars) if (bars or "").strip().isdigit() else ARCHIVE_VISIBLE_BARS_DEFAULT except ValueError: bar_n = ARCHIVE_VISIBLE_BARS_DEFAULT result = resolve_archive_chart( ex_k, sym, timeframe, anchor_ms=anchor, opened_ms=open_ms, closed_ms=close_ms, mode=mode, bars=bar_n, range_mode=(range or "").strip().lower() or "window", ) if not result.get("ok"): raise HTTPException(status_code=404, detail=result.get("msg") or "无 K 线") return result class ArchiveOverlayBody(BaseModel): behavior_tag: str = "" note: str = "" @app.patch("/api/archive/trade/{exchange_key}/{trade_id}") def api_archive_trade_overlay( exchange_key: str, trade_id: int, body: ArchiveOverlayBody = Body(...), ): ex_k = (exchange_key or "").strip().lower() if not ex_k: raise HTTPException(status_code=400, detail="缺少 exchange_key") init_archive_db() out = upsert_trade_overlay( ex_k, int(trade_id), behavior_tag=body.behavior_tag, note=body.note, ) return {"ok": True, "overlay": out} @app.delete("/api/archive/trade/{exchange_key}/{trade_id}") def api_archive_trade_delete(exchange_key: str, trade_id: int): from hub_symbol_archive_lib import delete_trade_from_archive ex_k = (exchange_key or "").strip().lower() if not ex_k: raise HTTPException(status_code=400, detail="缺少 exchange_key") init_archive_db() removed = delete_trade_from_archive(ex_k, int(trade_id)) if not removed: raise HTTPException(status_code=404, detail="档案中无该笔交易") return {"ok": True, "exchange_key": ex_k, "trade_id": int(trade_id)} @app.post("/api/archive/sync") async def api_archive_sync(): body = await _run_archive_sync_once() return body @app.get("/api/ping") def api_ping(): return { "ok": True, "service": "manual-trading-hub", "build": HUB_BUILD, "trade_ui": False, "features": ["monitor", "settings", "auth", "board_sse", "archive"], "board_poll_interval_sec": HUB_BOARD_POLL_INTERVAL, "board_version": board_store.version, "board_aggregating": board_store.aggregating, "board_updated_at": (board_store.payload or {}).get("updated_at") if isinstance(board_store.payload, dict) else None, "board_error": board_store.last_error, "password_required": password_required(), "env_disabled_ids": sorted(env_force_disabled_ids()), "hub_disabled_ids_raw": (os.getenv("HUB_DISABLED_IDS") or ""), } @app.post("/api/trade/order/{exchange_id}") @app.post("/api/trade/key/{exchange_id}") @app.post("/api/trade/trend/preview/{exchange_id}") @app.post("/api/trade/trend/execute/{exchange_id}") async def api_trade_removed(exchange_id: str): return _trade_removed_response() @app.get("/api/trade/meta/{exchange_id}") @app.get("/api/trade/trend/preview/{exchange_id}/{preview_id}") async def api_trade_removed_get(exchange_id: str, preview_id: str = ""): return _trade_removed_response() def main(): import uvicorn print( f"manual-trading-hub start build={HUB_BUILD} listen={HUB_HOST}:{HUB_PORT}", flush=True, ) uvicorn.run(app, host=HUB_HOST, port=HUB_PORT, log_level="info", access_log=False) if __name__ == "__main__": main()