feat: add hub fund overview tab with 180-day equity curves

Add /funds page for total and per-account balance (funding+trading), drawdown, and daily snapshots from monitor board aggregation.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-10 16:50:47 +08:00
parent 6eb17b7ddc
commit 77c7bbbb13
14 changed files with 1069 additions and 112 deletions
+384
View File
@@ -0,0 +1,384 @@
"""中控资金概况:分户日快照(180 交易日)、总资金曲线与回撤。"""
from __future__ import annotations
import json
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Optional
from hub_trades_lib import current_trading_day
HUB_DIR = Path(__file__).resolve().parent / "manual_trading_hub"
FUND_HISTORY_PATH = HUB_DIR / "hub_fund_history.json"
LEGACY_FUND_HISTORY_PATH = HUB_DIR / "hub_ai_fund_history.json"
try:
FUND_HISTORY_DAYS = max(30, int(os.getenv("HUB_FUND_HISTORY_DAYS", "180") or "180"))
except ValueError:
FUND_HISTORY_DAYS = 180
def _now_str() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def _safe_float(value: Any) -> Optional[float]:
try:
v = float(value)
return v if v >= 0 else None
except (TypeError, ValueError):
return None
def account_total_usdt(funding: Any, trading: Any) -> Optional[float]:
"""资金户 + 交易户;任一侧缺失则不计入(返回 None)。"""
fu = _safe_float(funding)
tu = _safe_float(trading)
if fu is None or tu is None:
return None
return round(fu + tu, 4)
def compute_drawdown(values: list[float]) -> dict[str, Any]:
"""基于资金权益序列计算峰值回撤(U 与 %)。"""
peak = 0.0
max_dd_u = 0.0
peak_at_end = 0.0
for v in values:
if not isinstance(v, (int, float)):
continue
fv = float(v)
if fv > peak:
peak = fv
dd = peak - fv
if dd > max_dd_u:
max_dd_u = dd
peak_at_end = peak
max_dd_u = round(max_dd_u, 4)
peak_at_end = round(peak_at_end, 4)
max_dd_pct = round((max_dd_u / peak_at_end) * 100, 2) if peak_at_end > 0 else None
return {
"peak_usdt": peak_at_end,
"max_drawdown_u": max_dd_u,
"max_drawdown_pct": max_dd_pct,
}
def _atomic_write(path: Path, data: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
os.replace(tmp, path)
def _prune_days(days: dict, *, keep_days: int, anchor_day: str) -> dict:
try:
anchor = datetime.strptime(anchor_day[:10], "%Y-%m-%d")
except ValueError:
anchor = datetime.now()
cutoff = (anchor - timedelta(days=max(1, keep_days) - 1)).strftime("%Y-%m-%d")
return {k: v for k, v in (days or {}).items() if str(k) >= cutoff}
def _migrate_legacy_store(days: dict) -> dict:
if not LEGACY_FUND_HISTORY_PATH.is_file():
return days
try:
loaded = json.loads(LEGACY_FUND_HISTORY_PATH.read_text(encoding="utf-8"))
legacy_days = loaded.get("days") if isinstance(loaded, dict) else {}
if not isinstance(legacy_days, dict):
return days
merged = dict(days)
for day, block in legacy_days.items():
if day in merged:
continue
if isinstance(block, dict) and block.get("accounts"):
merged[day] = block
return merged
except Exception:
return days
def _load_store() -> dict:
if not FUND_HISTORY_PATH.is_file():
store = {"version": 1, "days": _migrate_legacy_store({})}
if store["days"]:
_atomic_write(FUND_HISTORY_PATH, store)
return store
try:
loaded = json.loads(FUND_HISTORY_PATH.read_text(encoding="utf-8"))
if isinstance(loaded, dict):
loaded.setdefault("version", 1)
days = dict(loaded.get("days") or {})
loaded["days"] = _migrate_legacy_store(days)
return loaded
except Exception:
pass
return {"version": 1, "days": {}}
def record_fund_snapshot(
trading_day: str,
accounts: list[dict],
*,
keep_days: int = FUND_HISTORY_DAYS,
reset_hour: int = 8,
) -> dict[str, Any]:
"""写入当日各户资金账户/交易账户余额,并裁剪历史。"""
day = (trading_day or "").strip()[:10] or current_trading_day(reset_hour=reset_hour)
store = _load_store()
days = dict(store.get("days") or {})
row_accounts: dict[str, dict] = {}
for ac in accounts or []:
key = str(ac.get("key") or ac.get("id") or "").strip()
if not key:
continue
if not ac.get("monitored"):
continue
fu = _safe_float(ac.get("funding_usdt"))
tu = _safe_float(ac.get("trading_usdt"))
total = account_total_usdt(fu, tu)
if total is None:
continue
row_accounts[key] = {
"name": ac.get("name"),
"funding_usdt": fu,
"trading_usdt": tu,
"total_usdt": total,
"recorded_at": _now_str(),
}
if row_accounts:
days[day] = {"accounts": row_accounts, "updated_at": _now_str()}
days = _prune_days(days, keep_days=keep_days, anchor_day=day)
_atomic_write(FUND_HISTORY_PATH, {"version": 1, "days": days})
return days
def record_fund_snapshot_from_board(
rows: list[dict],
*,
keep_days: int = FUND_HISTORY_DAYS,
reset_hour: int = 8,
) -> dict[str, Any]:
"""监控板行写入当日快照(仅 account_ok 且资金/交易户齐全)。"""
day = current_trading_day(reset_hour=reset_hour)
accounts = []
for row in rows or []:
if not isinstance(row, dict):
continue
if not row.get("account_ok"):
continue
accounts.append(
{
"key": row.get("key") or row.get("id"),
"name": row.get("name"),
"funding_usdt": row.get("funding_usdt"),
"trading_usdt": row.get("trading_usdt"),
"monitored": True,
}
)
return record_fund_snapshot(day, accounts, keep_days=keep_days, reset_hour=reset_hour)
def get_fund_history(*, anchor_day: str, keep_days: int = FUND_HISTORY_DAYS) -> dict[str, dict]:
store = _load_store()
return _prune_days(
dict(store.get("days") or {}),
keep_days=keep_days,
anchor_day=anchor_day,
)
def _exchange_monitored(ex: dict) -> bool:
return bool(ex.get("enabled")) and not bool(ex.get("env_disabled"))
def _live_row_for_exchange(ex: dict, rows_by_key: dict[str, dict]) -> Optional[dict]:
key = str(ex.get("key") or "").strip()
if not key:
return None
return rows_by_key.get(key)
def _series_from_history(
history: dict[str, dict],
account_keys: list[str],
) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for day in sorted(history.keys()):
block = history.get(day) or {}
ac_map = block.get("accounts") or {}
total = 0.0
n = 0
for key in account_keys:
ac = ac_map.get(key) or {}
t = account_total_usdt(ac.get("funding_usdt"), ac.get("trading_usdt"))
if t is None:
t = _safe_float(ac.get("total_usdt"))
if t is None:
continue
total += t
n += 1
if n > 0:
out.append({"day": day, "total_usdt": round(total, 4)})
return out
def _account_series(history: dict[str, dict], key: str) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for day in sorted(history.keys()):
ac = (history.get(day) or {}).get("accounts", {}).get(key) or {}
t = account_total_usdt(ac.get("funding_usdt"), ac.get("trading_usdt"))
if t is None:
t = _safe_float(ac.get("total_usdt"))
if t is None:
continue
out.append(
{
"day": day,
"total_usdt": t,
"funding_usdt": _safe_float(ac.get("funding_usdt")),
"trading_usdt": _safe_float(ac.get("trading_usdt")),
}
)
return out
def build_fund_overview(
exchanges: list[dict],
*,
board_rows: Optional[list[dict]] = None,
trading_day: Optional[str] = None,
keep_days: int = FUND_HISTORY_DAYS,
reset_hour: int = 8,
updated_at: Optional[str] = None,
) -> dict[str, Any]:
day = (trading_day or "").strip()[:10] or current_trading_day(reset_hour=reset_hour)
history = get_fund_history(anchor_day=day, keep_days=keep_days)
rows_by_key: dict[str, dict] = {}
for row in board_rows or []:
if isinstance(row, dict):
k = str(row.get("key") or "").strip()
if k:
rows_by_key[k] = row
monitored_keys: list[str] = []
accounts_out: list[dict[str, Any]] = []
live_total = 0.0
live_known = 0
for ex in exchanges or []:
key = str(ex.get("key") or "").strip()
monitored = _exchange_monitored(ex)
row = _live_row_for_exchange(ex, rows_by_key) if monitored else None
fu = tu = total = None
data_ok = False
if monitored and row and row.get("account_ok"):
fu = _safe_float(row.get("funding_usdt"))
tu = _safe_float(row.get("trading_usdt"))
total = account_total_usdt(fu, tu)
data_ok = total is not None
if data_ok:
live_total += total
live_known += 1
series = _account_series(history, key) if monitored and key else []
dd = compute_drawdown([p["total_usdt"] for p in series]) if series else {
"peak_usdt": None,
"max_drawdown_u": None,
"max_drawdown_pct": None,
}
day_delta = None
if series:
if len(series) >= 2:
day_delta = round(series[-1]["total_usdt"] - series[-2]["total_usdt"], 4)
elif data_ok and total is not None:
day_delta = round(total - series[-1]["total_usdt"], 4)
accounts_out.append(
{
"id": ex.get("id"),
"key": key,
"name": ex.get("name") or key,
"monitored": monitored,
"data_ok": data_ok,
"funding_usdt": fu,
"trading_usdt": tu,
"total_usdt": total,
"series": series,
"drawdown": dd,
"day_delta_usdt": day_delta,
}
)
if monitored and key:
monitored_keys.append(key)
total_series = _series_from_history(history, monitored_keys)
if live_known > 0:
last_day = total_series[-1]["day"] if total_series else None
live_point = round(live_total, 4)
if last_day == day and total_series:
total_series[-1]["total_usdt"] = live_point
total_series[-1]["live"] = True
else:
total_series.append({"day": day, "total_usdt": live_point, "live": True})
total_dd = compute_drawdown([p["total_usdt"] for p in total_series]) if total_series else {
"peak_usdt": None,
"max_drawdown_u": None,
"max_drawdown_pct": None,
}
total_day_delta = None
if total_series:
if len(total_series) >= 2:
total_day_delta = round(
total_series[-1]["total_usdt"] - total_series[-2]["total_usdt"], 4
)
return {
"ok": True,
"trading_day": day,
"reset_hour": reset_hour,
"keep_days": keep_days,
"updated_at": updated_at,
"totals": {
"monitored_count": len(monitored_keys),
"live_known_count": live_known,
"total_usdt": round(live_total, 4) if live_known > 0 else None,
"day_delta_usdt": total_day_delta,
"series": total_series,
"drawdown": total_dd,
},
"accounts": accounts_out,
}
def format_fund_history_text(
history: dict[str, dict],
*,
account_names: Optional[dict[str, str]] = None,
) -> str:
if not history:
return "(暂无资金历史快照)"
names = account_names or {}
lines = ["【资金快照(资金账户 + 交易账户 USDT)】"]
for day in sorted(history.keys()):
block = history.get(day) or {}
ac_map = block.get("accounts") or {}
if not ac_map:
continue
parts = []
for key, ac in ac_map.items():
label = names.get(key) or ac.get("name") or key
fu = ac.get("funding_usdt")
tu = ac.get("trading_usdt")
tot = ac.get("total_usdt")
if tot is None:
tot = account_total_usdt(fu, tu)
fu_txt = f"{fu}U" if fu is not None else "未知"
tu_txt = f"{tu}U" if tu is not None else "未知"
tot_txt = f"{tot}U" if tot is not None else "未知"
parts.append(f"{label}: 合计{tot_txt}(资金{fu_txt}/交易{tu_txt}")
lines.append(f"- {day}: " + "".join(parts))
return "\n".join(lines) if len(lines) > 1 else "(暂无资金历史快照)"