From d07357b98e673a55cca51757ae2e115d426590c9 Mon Sep 17 00:00:00 2001 From: dekun Date: Wed, 3 Jun 2026 22:44:01 +0800 Subject: [PATCH] fix(hub): merge mark price from Flask snapshot and fix board refresh Sync hub positions with instance price_snapshot (order_prices and position_marks). Fix monitor board UI when hub restarts (version rewind) and queue snapshot fetches. Expose board aggregate status on /api/ping for diagnostics. Co-authored-by: Cursor --- crypto_monitor_binance/app.py | 8 ++ crypto_monitor_gate/app.py | 8 ++ crypto_monitor_gate_bot/app.py | 8 ++ crypto_monitor_okx/app.py | 8 ++ hub_position_metrics.py | 115 +++++++++++++++++++++++++++ manual_trading_hub/agent.py | 26 +----- manual_trading_hub/hub.py | 107 ++++++++++++++++++++++++- manual_trading_hub/static/app.js | 24 ++++-- manual_trading_hub/static/index.html | 2 +- 9 files changed, 275 insertions(+), 31 deletions(-) create mode 100644 hub_position_metrics.py diff --git a/crypto_monitor_binance/app.py b/crypto_monitor_binance/app.py index 2d58c45..0d064a2 100644 --- a/crypto_monitor_binance/app.py +++ b/crypto_monitor_binance/app.py @@ -6198,10 +6198,18 @@ def api_price_snapshot(): ) order_prices.append(payload) + from hub_position_metrics import build_position_marks_list + + position_marks = build_position_marks_list( + all_swap_positions, + format_mark_display=lambda sym, px: format_price_for_symbol(sym, px), + ) + return jsonify({ "updated_at": app_now_str(), "key_prices": key_prices, "order_prices": order_prices, + "position_marks": position_marks, "positions_raw_count": len(all_swap_positions), }) diff --git a/crypto_monitor_gate/app.py b/crypto_monitor_gate/app.py index 3e19af0..ad07c97 100644 --- a/crypto_monitor_gate/app.py +++ b/crypto_monitor_gate/app.py @@ -6331,10 +6331,18 @@ def api_price_snapshot(): ) order_prices.append(payload) + from hub_position_metrics import build_position_marks_list + + position_marks = build_position_marks_list( + all_swap_positions, + format_mark_display=lambda sym, px: format_price_for_symbol(sym, px), + ) + return jsonify({ "updated_at": app_now_str(), "key_prices": key_prices, "order_prices": order_prices, + "position_marks": position_marks, "positions_raw_count": len(all_swap_positions), }) diff --git a/crypto_monitor_gate_bot/app.py b/crypto_monitor_gate_bot/app.py index 8462534..4bc8ed7 100644 --- a/crypto_monitor_gate_bot/app.py +++ b/crypto_monitor_gate_bot/app.py @@ -5770,10 +5770,18 @@ def api_price_snapshot(): ) order_prices.append(payload) + from hub_position_metrics import build_position_marks_list + + position_marks = build_position_marks_list( + all_swap_positions, + format_mark_display=lambda sym, px: format_price_for_symbol(sym, px), + ) + return jsonify({ "updated_at": app_now_str(), "key_prices": key_prices, "order_prices": order_prices, + "position_marks": position_marks, "positions_raw_count": len(all_swap_positions), }) diff --git a/crypto_monitor_okx/app.py b/crypto_monitor_okx/app.py index 65a53b1..d1a49de 100644 --- a/crypto_monitor_okx/app.py +++ b/crypto_monitor_okx/app.py @@ -6005,10 +6005,18 @@ def api_price_snapshot(): ) order_prices.append(payload) + from hub_position_metrics import build_position_marks_list + + position_marks = build_position_marks_list( + all_swap_positions, + format_mark_display=lambda sym, px: format_price_for_symbol(sym, px), + ) + return jsonify({ "updated_at": app_now_str(), "key_prices": key_prices, "order_prices": order_prices, + "position_marks": position_marks, "positions_raw_count": len(all_swap_positions), }) diff --git a/hub_position_metrics.py b/hub_position_metrics.py new file mode 100644 index 0000000..adccb61 --- /dev/null +++ b/hub_position_metrics.py @@ -0,0 +1,115 @@ +"""ccxt 持仓标记价解析(实例 price_snapshot 与中控子代理共用)。""" +from __future__ import annotations + +import math +from typing import Any, Callable + + +def _finite_or_none(x: Any) -> float | None: + try: + f = float(x) + return f if math.isfinite(f) else None + except (TypeError, ValueError): + return None + + +def _coerce_float(*values: Any) -> float | None: + for v in values: + if v is None or v == "": + continue + px = _finite_or_none(v) + if px is not None and px > 0: + return px + return None + + +def position_contracts(p: dict[str, Any]) -> float: + raw = p.get("contracts") + if raw is not None: + try: + return float(raw) + except (TypeError, ValueError): + pass + info = p.get("info") or {} + if not isinstance(info, dict): + info = {} + for k in ("positionAmt", "positionamt", "pos", "size"): + if k in info: + try: + v = float(info[k]) + if v != 0: + return v + except (TypeError, ValueError): + pass + return 0.0 + + +def position_side_from_ccxt(p: dict[str, Any], contracts: float | None = None) -> str: + s = (p.get("side") or "").lower() + if s in ("long", "short"): + return s + c = contracts if contracts is not None else position_contracts(p) + if c > 0: + return "long" + if c < 0: + return "short" + return "long" + + +def parse_position_mark_price(p: dict[str, Any]) -> float | None: + """四所 ccxt 持仓统一解析标记价(与 crypto_monitor_* parse_ccxt_position_metrics 口径一致)。""" + if not isinstance(p, dict): + return None + info = p.get("info") or {} + if not isinstance(info, dict): + info = {} + mark = _coerce_float( + p.get("markPrice"), + p.get("mark_price"), + p.get("mark"), + info.get("markPx"), + info.get("mark_price"), + info.get("markPrice"), + ) + if mark is not None: + return mark + contracts = position_contracts(p) + if abs(contracts) >= 1e-12: + notional = _finite_or_none(p.get("notional")) + if notional is not None and abs(notional) > 0: + return abs(notional) / abs(contracts) + return None + + +def build_position_marks_list( + positions: list, + *, + format_mark_display: Callable[[str, float], str] | None = None, +) -> list[dict[str, Any]]: + """从 fetch_positions 结果生成 position_marks,供 price_snapshot / 中控合并。""" + out: list[dict[str, Any]] = [] + for p in positions or []: + if not isinstance(p, dict): + continue + c = position_contracts(p) + if abs(c) < 1e-12: + continue + mark = parse_position_mark_price(p) + if mark is None or mark <= 0: + continue + sym = (p.get("symbol") or "").strip() + side = position_side_from_ccxt(p, c) + row: dict[str, Any] = { + "symbol": sym, + "side": side, + "mark_price": mark, + } + if format_mark_display and sym: + try: + row["mark_price_display"] = format_mark_display(sym, mark) + except Exception: + row["mark_price_display"] = f"{mark:g}" + else: + row["mark_price_display"] = f"{mark:g}" + out.append(row) + return out diff --git a/manual_trading_hub/agent.py b/manual_trading_hub/agent.py index a00655e..16302d2 100644 --- a/manual_trading_hub/agent.py +++ b/manual_trading_hub/agent.py @@ -31,6 +31,7 @@ _REPO_ROOT = Path(__file__).resolve().parents[1] if str(_REPO_ROOT) not in sys.path: sys.path.insert(0, str(_REPO_ROOT)) from hub_ohlcv_lib import format_price_by_tick, price_tick_from_market +from hub_position_metrics import parse_position_mark_price import ccxt from fastapi import FastAPI, Header, HTTPException, Request @@ -409,29 +410,8 @@ def _position_entry_price(p: dict[str, Any]) -> float | None: def _position_mark_price(p: dict[str, Any]) -> float | None: - """四所 ccxt 持仓统一解析标记价(用于强平/浮盈计算)。""" - info = p.get("info") or {} - if not isinstance(info, dict): - info = {} - for key in ( - p.get("markPrice"), - p.get("mark_price"), - p.get("mark"), - info.get("markPx"), - info.get("mark_price"), - info.get("markPrice"), - info.get("last"), - info.get("lastPrice"), - ): - px = _finite_or_none(key) - if px is not None and px > 0: - return px - contracts = _position_contracts(p) - if abs(contracts) >= 1e-12: - notional = _finite_or_none(p.get("notional")) - if notional is not None and abs(notional) > 0: - return abs(notional) / abs(contracts) - return None + """四所 ccxt 持仓统一解析标记价(与实例 parse_ccxt_position_metrics 一致)。""" + return parse_position_mark_price(p) def _ticker_mark_price(ex: Any, symbol: str) -> float | None: diff --git a/manual_trading_hub/hub.py b/manual_trading_hub/hub.py index 63f7df1..4476d21 100644 --- a/manual_trading_hub/hub.py +++ b/manual_trading_hub/hub.py @@ -16,7 +16,6 @@ if str(_REPO_ROOT) not in sys.path: from hub_kline_store import format_ohlcv_detail, resolve_chart_bars, retention_days from hub_ohlcv_lib import CHART_TIMEFRAME_ORDER, CHART_TIMEFRAMES, bar_limit_for_timeframe - from env_load import load_hub_dotenv load_hub_dotenv() @@ -705,6 +704,106 @@ def _merge_flask_position_breakeven(agent_row: dict, snap: dict | None, hub_mon: p["sl_breakeven_secured"] = bool(matched["sl_breakeven_secured"]) +def _agent_position_has_mark(p: dict) -> bool: + try: + v = float(p.get("mark_price")) + return v > 0 + except (TypeError, ValueError): + return False + + +def _apply_agent_mark_price(p: dict, mark_price: object, mark_display: object = None) -> None: + try: + mpf = float(mark_price) + except (TypeError, ValueError): + return + if mpf <= 0: + return + p["mark_price"] = mpf + disp = mark_display + if disp is not None and str(disp).strip() not in ("", "-"): + p["mark_price_fmt"] = str(disp) + + +def _find_matched_order_price_op( + p: dict, + order_prices: list, + hub_orders: list, + op_by_id: dict, +) -> dict | None: + sym = p.get("symbol") or "" + side = (p.get("side") or "").lower() + for o in hub_orders: + if not isinstance(o, dict): + continue + o_sym = o.get("exchange_symbol") or o.get("symbol") or "" + if not _symbols_match(sym, o_sym): + continue + if (o.get("direction") or "").lower() != side: + continue + matched = op_by_id.get(o.get("id")) + if isinstance(matched, dict): + return matched + break + for op in order_prices: + if not isinstance(op, dict): + continue + if not _symbols_match(sym, op.get("symbol") or ""): + continue + return op + return None + + +def _merge_flask_position_mark_price( + agent_row: dict, snap: dict | None, hub_mon: dict | None +) -> None: + """子代理无标记价时,用实例 price_snapshot 的交易所标记价补全中控持仓展示。""" + ag = agent_row.get("agent") + if not isinstance(ag, dict) or not isinstance(snap, dict): + return + positions = ag.get("positions") + if not isinstance(positions, list) or not positions: + return + order_prices = snap.get("order_prices") or [] + hub_orders = [] + if isinstance(hub_mon, dict): + hub_orders = hub_mon.get("orders") or [] + op_by_id = { + op.get("id"): op + for op in order_prices + if isinstance(op, dict) and op.get("id") is not None + } + for p in positions: + if not isinstance(p, dict) or _agent_position_has_mark(p): + continue + matched = _find_matched_order_price_op(p, order_prices, hub_orders, op_by_id) + if isinstance(matched, dict): + _apply_agent_mark_price( + p, + matched.get("exchange_mark_price"), + matched.get("exchange_mark_price_display"), + ) + position_marks = snap.get("position_marks") or [] + if not isinstance(position_marks, list): + return + for p in positions: + if not isinstance(p, dict) or _agent_position_has_mark(p): + continue + sym = p.get("symbol") or "" + side = (p.get("side") or "").lower() + for pm in position_marks: + if not isinstance(pm, dict): + continue + if not _symbols_match(sym, pm.get("symbol") or ""): + continue + if (pm.get("side") or "").lower() != side: + continue + _apply_agent_mark_price( + p, pm.get("mark_price"), pm.get("mark_price_display") + ) + break + + def _merge_flask_exchange_tpsl(agent_row: dict, snap: dict | None, hub_mon: dict | None) -> None: """子代理挂单为空时,用实例 Flask 已算好的 exchange_tpsl 补全展示。""" ag = agent_row.get("agent") @@ -764,6 +863,7 @@ async def _assemble_board_row( _merge_flask_order_price_fields(hub_mon, snap) _merge_flask_exchange_tpsl(agent_row, snap, hub_mon if isinstance(hub_mon, dict) else None) _merge_flask_position_breakeven(agent_row, snap, hub_mon if isinstance(hub_mon, dict) else None) + _merge_flask_position_mark_price(agent_row, snap, hub_mon if isinstance(hub_mon, dict) else None) flask_ok = isinstance(hub_mon, dict) and hub_mon.get("ok") is not False raw_review = (ex.get("review_url") or "").strip() review_link = browser_url(raw_review) if raw_review else default_review_url( @@ -1088,6 +1188,11 @@ def api_ping(): "features": ["monitor", "settings", "auth", "board_sse"], "board_poll_interval_sec": HUB_BOARD_POLL_INTERVAL, "board_version": board_store.version, + "board_aggregating": board_store.aggregating, + "board_updated_at": (board_store.payload or {}).get("updated_at") + if isinstance(board_store.payload, dict) + else None, + "board_error": board_store.last_error, "password_required": password_required(), "env_disabled_ids": sorted(env_force_disabled_ids()), "hub_disabled_ids_raw": (os.getenv("HUB_DISABLED_IDS") or ""), diff --git a/manual_trading_hub/static/app.js b/manual_trading_hub/static/app.js index d6e210f..de9be2f 100644 --- a/manual_trading_hub/static/app.js +++ b/manual_trading_hub/static/app.js @@ -12,6 +12,7 @@ let lastMonitorBoardUpdatedAt = ""; let localBoardVersion = 0; let monitorBoardInFlight = false; + let monitorBoardFetchPending = false; let monitorBoardSlowHintTimer = null; let boardEventSource = null; let sseReconnectTimer = null; @@ -384,7 +385,7 @@ try { const st = JSON.parse(ev.data || "{}"); const ver = Number(st.board_version) || 0; - if (ver > localBoardVersion) { + if (ver !== localBoardVersion) { void fetchMonitorBoardSnapshot({ background: true }); } else if (st.aggregating && lastMonitorRows.length) { applyMonitorBoardUi(lastMonitorRows, st.updated_at || lastMonitorBoardUpdatedAt, { @@ -468,7 +469,7 @@ if (!cached) return false; lastMonitorRows = cached.rows; lastMonitorBoardUpdatedAt = cached.updated_at || ""; - localBoardVersion = Number(cached.board_version) || 0; + localBoardVersion = 0; applyMonitorBoardUi(cached.rows, lastMonitorBoardUpdatedAt, { stale: true }); return true; } @@ -660,7 +661,10 @@ const background = !!options.background; const showLoading = !!options.showLoading && !lastMonitorRows.length; const box = document.getElementById("monitor-grid"); - if (monitorBoardInFlight && background) return; + if (monitorBoardInFlight) { + if (background) monitorBoardFetchPending = true; + else return; + } if (showLoading && box) { box.innerHTML = '
正在加载监控快照…

'; @@ -687,11 +691,14 @@ } return; } - if (ver >= localBoardVersion || !lastMonitorRows.length) { + const ts = data.updated_at || ""; + const versionChanged = ver !== localBoardVersion; + const timeChanged = ts && ts !== lastMonitorBoardUpdatedAt; + if (versionChanged || timeChanged || !lastMonitorRows.length) { localBoardVersion = ver; lastMonitorRows = rows; - saveMonitorBoardCache(lastMonitorRows, data.updated_at, ver); - applyMonitorBoardUi(lastMonitorRows, data.updated_at, { + saveMonitorBoardCache(lastMonitorRows, ts, ver); + applyMonitorBoardUi(lastMonitorRows, ts, { stale: !!data.aggregating, }); } else if (data.aggregating && lastMonitorRows.length) { @@ -715,6 +722,10 @@ clearTimeout(fetchTimer); clearMonitorBoardSlowHint(); monitorBoardInFlight = false; + if (monitorBoardFetchPending) { + monitorBoardFetchPending = false; + void fetchMonitorBoardSnapshot({ background: true }); + } } } @@ -724,6 +735,7 @@ } try { await requestMonitorBoardRefresh(); + await fetchMonitorBoardSnapshot({ background: false }); } catch (e) { showToast(String(e), true); } diff --git a/manual_trading_hub/static/index.html b/manual_trading_hub/static/index.html index 88cd056..f5c9557 100644 --- a/manual_trading_hub/static/index.html +++ b/manual_trading_hub/static/index.html @@ -245,6 +245,6 @@
- +