fix(hub): merge mark price from Flask snapshot and fix board refresh

Sync hub positions with instance price_snapshot (order_prices and position_marks).
Fix monitor board UI when hub restarts (version rewind) and queue snapshot fetches.
Expose board aggregate status on /api/ping for diagnostics.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-03 22:44:01 +08:00
parent 2a9602610e
commit d07357b98e
9 changed files with 275 additions and 31 deletions
+8
View File
@@ -6198,10 +6198,18 @@ def api_price_snapshot():
)
order_prices.append(payload)
from hub_position_metrics import build_position_marks_list
position_marks = build_position_marks_list(
all_swap_positions,
format_mark_display=lambda sym, px: format_price_for_symbol(sym, px),
)
return jsonify({
"updated_at": app_now_str(),
"key_prices": key_prices,
"order_prices": order_prices,
"position_marks": position_marks,
"positions_raw_count": len(all_swap_positions),
})
+8
View File
@@ -6331,10 +6331,18 @@ def api_price_snapshot():
)
order_prices.append(payload)
from hub_position_metrics import build_position_marks_list
position_marks = build_position_marks_list(
all_swap_positions,
format_mark_display=lambda sym, px: format_price_for_symbol(sym, px),
)
return jsonify({
"updated_at": app_now_str(),
"key_prices": key_prices,
"order_prices": order_prices,
"position_marks": position_marks,
"positions_raw_count": len(all_swap_positions),
})
+8
View File
@@ -5770,10 +5770,18 @@ def api_price_snapshot():
)
order_prices.append(payload)
from hub_position_metrics import build_position_marks_list
position_marks = build_position_marks_list(
all_swap_positions,
format_mark_display=lambda sym, px: format_price_for_symbol(sym, px),
)
return jsonify({
"updated_at": app_now_str(),
"key_prices": key_prices,
"order_prices": order_prices,
"position_marks": position_marks,
"positions_raw_count": len(all_swap_positions),
})
+8
View File
@@ -6005,10 +6005,18 @@ def api_price_snapshot():
)
order_prices.append(payload)
from hub_position_metrics import build_position_marks_list
position_marks = build_position_marks_list(
all_swap_positions,
format_mark_display=lambda sym, px: format_price_for_symbol(sym, px),
)
return jsonify({
"updated_at": app_now_str(),
"key_prices": key_prices,
"order_prices": order_prices,
"position_marks": position_marks,
"positions_raw_count": len(all_swap_positions),
})
+115
View File
@@ -0,0 +1,115 @@
"""ccxt 持仓标记价解析(实例 price_snapshot 与中控子代理共用)。"""
from __future__ import annotations
import math
from typing import Any, Callable
def _finite_or_none(x: Any) -> float | None:
try:
f = float(x)
return f if math.isfinite(f) else None
except (TypeError, ValueError):
return None
def _coerce_float(*values: Any) -> float | None:
for v in values:
if v is None or v == "":
continue
px = _finite_or_none(v)
if px is not None and px > 0:
return px
return None
def position_contracts(p: dict[str, Any]) -> float:
raw = p.get("contracts")
if raw is not None:
try:
return float(raw)
except (TypeError, ValueError):
pass
info = p.get("info") or {}
if not isinstance(info, dict):
info = {}
for k in ("positionAmt", "positionamt", "pos", "size"):
if k in info:
try:
v = float(info[k])
if v != 0:
return v
except (TypeError, ValueError):
pass
return 0.0
def position_side_from_ccxt(p: dict[str, Any], contracts: float | None = None) -> str:
s = (p.get("side") or "").lower()
if s in ("long", "short"):
return s
c = contracts if contracts is not None else position_contracts(p)
if c > 0:
return "long"
if c < 0:
return "short"
return "long"
def parse_position_mark_price(p: dict[str, Any]) -> float | None:
"""四所 ccxt 持仓统一解析标记价(与 crypto_monitor_* parse_ccxt_position_metrics 口径一致)。"""
if not isinstance(p, dict):
return None
info = p.get("info") or {}
if not isinstance(info, dict):
info = {}
mark = _coerce_float(
p.get("markPrice"),
p.get("mark_price"),
p.get("mark"),
info.get("markPx"),
info.get("mark_price"),
info.get("markPrice"),
)
if mark is not None:
return mark
contracts = position_contracts(p)
if abs(contracts) >= 1e-12:
notional = _finite_or_none(p.get("notional"))
if notional is not None and abs(notional) > 0:
return abs(notional) / abs(contracts)
return None
def build_position_marks_list(
positions: list,
*,
format_mark_display: Callable[[str, float], str] | None = None,
) -> list[dict[str, Any]]:
"""从 fetch_positions 结果生成 position_marks,供 price_snapshot / 中控合并。"""
out: list[dict[str, Any]] = []
for p in positions or []:
if not isinstance(p, dict):
continue
c = position_contracts(p)
if abs(c) < 1e-12:
continue
mark = parse_position_mark_price(p)
if mark is None or mark <= 0:
continue
sym = (p.get("symbol") or "").strip()
side = position_side_from_ccxt(p, c)
row: dict[str, Any] = {
"symbol": sym,
"side": side,
"mark_price": mark,
}
if format_mark_display and sym:
try:
row["mark_price_display"] = format_mark_display(sym, mark)
except Exception:
row["mark_price_display"] = f"{mark:g}"
else:
row["mark_price_display"] = f"{mark:g}"
out.append(row)
return out
+3 -23
View File
@@ -31,6 +31,7 @@ _REPO_ROOT = Path(__file__).resolve().parents[1]
if str(_REPO_ROOT) not in sys.path:
sys.path.insert(0, str(_REPO_ROOT))
from hub_ohlcv_lib import format_price_by_tick, price_tick_from_market
from hub_position_metrics import parse_position_mark_price
import ccxt
from fastapi import FastAPI, Header, HTTPException, Request
@@ -409,29 +410,8 @@ def _position_entry_price(p: dict[str, Any]) -> float | None:
def _position_mark_price(p: dict[str, Any]) -> float | None:
"""四所 ccxt 持仓统一解析标记价(用于强平/浮盈计算)。"""
info = p.get("info") or {}
if not isinstance(info, dict):
info = {}
for key in (
p.get("markPrice"),
p.get("mark_price"),
p.get("mark"),
info.get("markPx"),
info.get("mark_price"),
info.get("markPrice"),
info.get("last"),
info.get("lastPrice"),
):
px = _finite_or_none(key)
if px is not None and px > 0:
return px
contracts = _position_contracts(p)
if abs(contracts) >= 1e-12:
notional = _finite_or_none(p.get("notional"))
if notional is not None and abs(notional) > 0:
return abs(notional) / abs(contracts)
return None
"""四所 ccxt 持仓统一解析标记价(与实例 parse_ccxt_position_metrics 一致)。"""
return parse_position_mark_price(p)
def _ticker_mark_price(ex: Any, symbol: str) -> float | None:
+106 -1
View File
@@ -16,7 +16,6 @@ if str(_REPO_ROOT) not in sys.path:
from hub_kline_store import format_ohlcv_detail, resolve_chart_bars, retention_days
from hub_ohlcv_lib import CHART_TIMEFRAME_ORDER, CHART_TIMEFRAMES, bar_limit_for_timeframe
from env_load import load_hub_dotenv
load_hub_dotenv()
@@ -705,6 +704,106 @@ def _merge_flask_position_breakeven(agent_row: dict, snap: dict | None, hub_mon:
p["sl_breakeven_secured"] = bool(matched["sl_breakeven_secured"])
def _agent_position_has_mark(p: dict) -> bool:
try:
v = float(p.get("mark_price"))
return v > 0
except (TypeError, ValueError):
return False
def _apply_agent_mark_price(p: dict, mark_price: object, mark_display: object = None) -> None:
try:
mpf = float(mark_price)
except (TypeError, ValueError):
return
if mpf <= 0:
return
p["mark_price"] = mpf
disp = mark_display
if disp is not None and str(disp).strip() not in ("", "-"):
p["mark_price_fmt"] = str(disp)
def _find_matched_order_price_op(
p: dict,
order_prices: list,
hub_orders: list,
op_by_id: dict,
) -> dict | None:
sym = p.get("symbol") or ""
side = (p.get("side") or "").lower()
for o in hub_orders:
if not isinstance(o, dict):
continue
o_sym = o.get("exchange_symbol") or o.get("symbol") or ""
if not _symbols_match(sym, o_sym):
continue
if (o.get("direction") or "").lower() != side:
continue
matched = op_by_id.get(o.get("id"))
if isinstance(matched, dict):
return matched
break
for op in order_prices:
if not isinstance(op, dict):
continue
if not _symbols_match(sym, op.get("symbol") or ""):
continue
return op
return None
def _merge_flask_position_mark_price(
agent_row: dict, snap: dict | None, hub_mon: dict | None
) -> None:
"""子代理无标记价时,用实例 price_snapshot 的交易所标记价补全中控持仓展示。"""
ag = agent_row.get("agent")
if not isinstance(ag, dict) or not isinstance(snap, dict):
return
positions = ag.get("positions")
if not isinstance(positions, list) or not positions:
return
order_prices = snap.get("order_prices") or []
hub_orders = []
if isinstance(hub_mon, dict):
hub_orders = hub_mon.get("orders") or []
op_by_id = {
op.get("id"): op
for op in order_prices
if isinstance(op, dict) and op.get("id") is not None
}
for p in positions:
if not isinstance(p, dict) or _agent_position_has_mark(p):
continue
matched = _find_matched_order_price_op(p, order_prices, hub_orders, op_by_id)
if isinstance(matched, dict):
_apply_agent_mark_price(
p,
matched.get("exchange_mark_price"),
matched.get("exchange_mark_price_display"),
)
position_marks = snap.get("position_marks") or []
if not isinstance(position_marks, list):
return
for p in positions:
if not isinstance(p, dict) or _agent_position_has_mark(p):
continue
sym = p.get("symbol") or ""
side = (p.get("side") or "").lower()
for pm in position_marks:
if not isinstance(pm, dict):
continue
if not _symbols_match(sym, pm.get("symbol") or ""):
continue
if (pm.get("side") or "").lower() != side:
continue
_apply_agent_mark_price(
p, pm.get("mark_price"), pm.get("mark_price_display")
)
break
def _merge_flask_exchange_tpsl(agent_row: dict, snap: dict | None, hub_mon: dict | None) -> None:
"""子代理挂单为空时,用实例 Flask 已算好的 exchange_tpsl 补全展示。"""
ag = agent_row.get("agent")
@@ -764,6 +863,7 @@ async def _assemble_board_row(
_merge_flask_order_price_fields(hub_mon, snap)
_merge_flask_exchange_tpsl(agent_row, snap, hub_mon if isinstance(hub_mon, dict) else None)
_merge_flask_position_breakeven(agent_row, snap, hub_mon if isinstance(hub_mon, dict) else None)
_merge_flask_position_mark_price(agent_row, snap, hub_mon if isinstance(hub_mon, dict) else None)
flask_ok = isinstance(hub_mon, dict) and hub_mon.get("ok") is not False
raw_review = (ex.get("review_url") or "").strip()
review_link = browser_url(raw_review) if raw_review else default_review_url(
@@ -1088,6 +1188,11 @@ def api_ping():
"features": ["monitor", "settings", "auth", "board_sse"],
"board_poll_interval_sec": HUB_BOARD_POLL_INTERVAL,
"board_version": board_store.version,
"board_aggregating": board_store.aggregating,
"board_updated_at": (board_store.payload or {}).get("updated_at")
if isinstance(board_store.payload, dict)
else None,
"board_error": board_store.last_error,
"password_required": password_required(),
"env_disabled_ids": sorted(env_force_disabled_ids()),
"hub_disabled_ids_raw": (os.getenv("HUB_DISABLED_IDS") or ""),
+18 -6
View File
@@ -12,6 +12,7 @@
let lastMonitorBoardUpdatedAt = "";
let localBoardVersion = 0;
let monitorBoardInFlight = false;
let monitorBoardFetchPending = false;
let monitorBoardSlowHintTimer = null;
let boardEventSource = null;
let sseReconnectTimer = null;
@@ -384,7 +385,7 @@
try {
const st = JSON.parse(ev.data || "{}");
const ver = Number(st.board_version) || 0;
if (ver > localBoardVersion) {
if (ver !== localBoardVersion) {
void fetchMonitorBoardSnapshot({ background: true });
} else if (st.aggregating && lastMonitorRows.length) {
applyMonitorBoardUi(lastMonitorRows, st.updated_at || lastMonitorBoardUpdatedAt, {
@@ -468,7 +469,7 @@
if (!cached) return false;
lastMonitorRows = cached.rows;
lastMonitorBoardUpdatedAt = cached.updated_at || "";
localBoardVersion = Number(cached.board_version) || 0;
localBoardVersion = 0;
applyMonitorBoardUi(cached.rows, lastMonitorBoardUpdatedAt, { stale: true });
return true;
}
@@ -660,7 +661,10 @@
const background = !!options.background;
const showLoading = !!options.showLoading && !lastMonitorRows.length;
const box = document.getElementById("monitor-grid");
if (monitorBoardInFlight && background) return;
if (monitorBoardInFlight) {
if (background) monitorBoardFetchPending = true;
else return;
}
if (showLoading && box) {
box.innerHTML =
'<div class="board-loading"><span class="board-loading-spin" aria-hidden="true"></span>正在加载监控快照…<p class="board-loading-sub"></p></div>';
@@ -687,11 +691,14 @@
}
return;
}
if (ver >= localBoardVersion || !lastMonitorRows.length) {
const ts = data.updated_at || "";
const versionChanged = ver !== localBoardVersion;
const timeChanged = ts && ts !== lastMonitorBoardUpdatedAt;
if (versionChanged || timeChanged || !lastMonitorRows.length) {
localBoardVersion = ver;
lastMonitorRows = rows;
saveMonitorBoardCache(lastMonitorRows, data.updated_at, ver);
applyMonitorBoardUi(lastMonitorRows, data.updated_at, {
saveMonitorBoardCache(lastMonitorRows, ts, ver);
applyMonitorBoardUi(lastMonitorRows, ts, {
stale: !!data.aggregating,
});
} else if (data.aggregating && lastMonitorRows.length) {
@@ -715,6 +722,10 @@
clearTimeout(fetchTimer);
clearMonitorBoardSlowHint();
monitorBoardInFlight = false;
if (monitorBoardFetchPending) {
monitorBoardFetchPending = false;
void fetchMonitorBoardSnapshot({ background: true });
}
}
}
@@ -724,6 +735,7 @@
}
try {
await requestMonitorBoardRefresh();
await fetchMonitorBoardSnapshot({ background: false });
} catch (e) {
showToast(String(e), true);
}
+1 -1
View File
@@ -245,6 +245,6 @@
<div id="toast"></div>
<script src="https://unpkg.com/lightweight-charts@4.2.0/dist/lightweight-charts.standalone.production.js"></script>
<script src="/assets/chart.js?v=20260603-hub-binance-tick"></script>
<script src="/assets/app.js?v=20260604-hub-mark-price2"></script>
<script src="/assets/app.js?v=20260604-hub-board-refresh"></script>
</body>
</html>