Files
crypto_monitor/manual_trading_hub/hub_ai/context.py
T
dekun a5f5239be9 feat(hub): render AI summary account breakdown as icon table
Replace pipe-separated account lines with a structured table from stats_snapshot, including exchange icons and position remarks.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-07 00:28:24 +08:00

301 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""中控 AI:四户数据聚合为结构化上下文。"""
from __future__ import annotations
import hashlib
import json
import os
from typing import Any, Callable, Optional
import httpx
from hub_ai.config import hub_agent_timeout, hub_flask_timeout, trading_day_reset_hour
from hub_trades_lib import current_trading_day, summarize_trades
def _hub_token() -> str:
return (os.getenv("HUB_BRIDGE_TOKEN") or os.getenv("CONTROL_TOKEN") or "").strip()
def _hub_headers() -> dict[str, str]:
tok = _hub_token()
return {"X-Hub-Token": tok} if tok else {}
def _agent_headers() -> dict[str, str]:
tok = (os.getenv("CONTROL_TOKEN") or os.getenv("HUB_BRIDGE_TOKEN") or "").strip()
return {"X-Control-Token": tok} if tok else {}
def _safe_float(v: Any) -> Optional[float]:
try:
if v is None or v == "":
return None
return float(v)
except (TypeError, ValueError):
return None
def _position_float_pnl(pos: dict) -> float:
for key in ("unrealized_pnl", "unrealizedPnl", "upnl"):
v = _safe_float(pos.get(key))
if v is not None:
return v
return 0.0
def _collect_open_issues(
*,
monitored: bool,
agent_ok: bool,
flask_ok: bool,
positions: list,
hub_mon: Optional[dict],
day_pnl: float,
) -> list[str]:
issues: list[str] = []
if not monitored:
return issues
if not agent_ok:
issues.append("Agent 连接异常")
if not flask_ok:
issues.append("Flask 监控连接异常")
if day_pnl < -0.01:
issues.append(f"当日平仓亏损 {day_pnl:.2f}U")
float_pnl = sum(_position_float_pnl(p) for p in positions if isinstance(p, dict))
if float_pnl < -0.5:
issues.append(f"当前浮亏 {float_pnl:.2f}U")
if isinstance(hub_mon, dict) and hub_mon.get("ok") is not False:
orders = hub_mon.get("orders") or []
trends = hub_mon.get("trends") or []
if positions and not orders and not trends:
issues.append("交易所有持仓但无本地 active 监控/趋势计划")
return issues
def _fetch_account_bundle(client: httpx.Client, ex: dict, trading_day: str) -> dict[str, Any]:
name = ex.get("name") or ex.get("key") or ex.get("id")
key = ex.get("key") or ""
enabled = bool(ex.get("enabled"))
env_disabled = bool(ex.get("env_disabled"))
monitored = enabled and not env_disabled
base: dict[str, Any] = {
"id": ex.get("id"),
"key": key,
"name": name,
"enabled": enabled,
"env_disabled": env_disabled,
"status": "未监控" if not monitored else "已监控",
"trades": [],
"trade_stats": summarize_trades([]),
"positions": [],
"float_pnl_u": 0.0,
"balance_usdt": None,
"issues": [],
"agent_ok": False,
"flask_ok": False,
"hub_monitor": None,
"active_orders": 0,
"active_trends": 0,
}
if not monitored:
base["issues"] = []
return base
agent_url = (ex.get("agent_url") or "").rstrip("/")
flask_url = (ex.get("flask_url") or "").rstrip("/")
agent_body = None
if agent_url:
try:
r = client.get(
f"{agent_url}/status",
headers=_agent_headers(),
timeout=hub_agent_timeout(),
)
if r.status_code == 200:
agent_body = r.json()
base["agent_ok"] = True
except Exception as exc:
base["issues"].append(f"Agent: {exc}")
if isinstance(agent_body, dict):
base["balance_usdt"] = _safe_float(agent_body.get("balance_usdt"))
positions = agent_body.get("positions") or []
if isinstance(positions, list):
base["positions"] = positions
base["float_pnl_u"] = round(
sum(_position_float_pnl(p) for p in positions if isinstance(p, dict)), 4
)
hub_mon = None
if flask_url:
try:
r = client.get(
f"{flask_url}/api/hub/trades/today",
headers=_hub_headers(),
params={"trading_day": trading_day},
timeout=hub_flask_timeout(),
)
if r.status_code == 200:
trades_body = r.json()
if isinstance(trades_body, dict) and trades_body.get("ok"):
base["trades"] = trades_body.get("trades") or []
base["trade_stats"] = trades_body.get("stats") or summarize_trades(base["trades"])
base["flask_ok"] = True
except Exception as exc:
base["issues"].append(f"成交接口: {exc}")
try:
r = client.get(
f"{flask_url}/api/hub/monitor",
headers=_hub_headers(),
timeout=hub_flask_timeout(),
)
if r.status_code == 200:
hub_mon = r.json()
if isinstance(hub_mon, dict) and hub_mon.get("ok") is not False:
base["hub_monitor"] = hub_mon
base["flask_ok"] = True
base["active_orders"] = len(hub_mon.get("orders") or [])
base["active_trends"] = len(hub_mon.get("trends") or [])
except Exception as exc:
if "成交接口" not in str(base["issues"]):
base["issues"].append(f"监控接口: {exc}")
if monitored and not base["agent_ok"] and not base["flask_ok"]:
base["status"] = "连接异常"
elif base["issues"]:
base["status"] = "已监控·需关注"
day_pnl = float((base.get("trade_stats") or {}).get("total_pnl_u") or 0)
base["issues"].extend(
_collect_open_issues(
monitored=monitored,
agent_ok=base["agent_ok"],
flask_ok=base["flask_ok"],
positions=base["positions"],
hub_mon=hub_mon if isinstance(hub_mon, dict) else None,
day_pnl=day_pnl,
)
)
base["issues"] = list(dict.fromkeys(base["issues"]))
return base
def build_daily_context(
exchanges: list[dict],
*,
trading_day: Optional[str] = None,
) -> dict[str, Any]:
day = (trading_day or "").strip()[:10] or current_trading_day(
reset_hour=trading_day_reset_hour()
)
accounts: list[dict] = []
with httpx.Client() as client:
for ex in exchanges or []:
accounts.append(_fetch_account_bundle(client, ex, day))
total_closed_pnl = 0.0
total_closed = total_win = total_loss = 0
total_float = 0.0
for ac in accounts:
if ac.get("status") == "未监控":
continue
st = ac.get("trade_stats") or {}
total_closed_pnl += float(st.get("total_pnl_u") or 0)
total_closed += int(st.get("closed_count") or 0)
total_win += int(st.get("win_count") or 0)
total_loss += int(st.get("loss_count") or 0)
total_float += float(ac.get("float_pnl_u") or 0)
totals = {
"trading_day": day,
"total_pnl_u": round(total_closed_pnl, 4),
"closed_count": total_closed,
"win_count": total_win,
"loss_count": total_loss,
"float_pnl_u": round(total_float, 4),
}
payload = {"trading_day": day, "totals": totals, "accounts": accounts}
text = format_context_text(payload)
digest = hashlib.sha256(text.encode("utf-8")).hexdigest()[:16]
return {"trading_day": day, "totals": totals, "accounts": accounts, "text": text, "context_hash": digest}
def format_context_text(payload: dict) -> str:
lines = []
totals = payload.get("totals") or {}
lines.append(
f"【合计】交易日 {totals.get('trading_day')} | "
f"平仓盈亏 {totals.get('total_pnl_u')}U | "
f"笔数 {totals.get('closed_count')}(胜{totals.get('win_count')}/负{totals.get('loss_count')}| "
f"监控户浮盈亏合计 {totals.get('float_pnl_u')}U"
)
lines.append("")
for ac in payload.get("accounts") or []:
st = ac.get("trade_stats") or {}
lines.append(f"--- 账户:{ac.get('name')} ({ac.get('key')}) ---")
lines.append(f"状态:{ac.get('status')}")
if ac.get("status") == "未监控":
lines.append("")
continue
lines.append(
f"当日平仓:{st.get('closed_count')} 笔,盈亏 {st.get('total_pnl_u')}U "
f"(胜{st.get('win_count')}/负{st.get('loss_count')}"
)
lines.append(f"合约可用余额:{ac.get('balance_usdt') if ac.get('balance_usdt') is not None else '未知'} USDT")
lines.append(f"当前持仓浮盈亏:{ac.get('float_pnl_u')}U | 下单监控 {ac.get('active_orders')} | 趋势计划 {ac.get('active_trends')}")
positions = ac.get("positions") or []
if positions:
lines.append("持仓:")
for p in positions[:8]:
if not isinstance(p, dict):
continue
sym = p.get("symbol") or "?"
side = p.get("side") or "?"
contracts = p.get("contracts") or p.get("size") or "?"
upnl = _position_float_pnl(p)
lines.append(f" - {sym} {side} 张数{contracts} 浮盈亏{upnl:.4f}U")
trades = ac.get("trades") or []
if trades:
lines.append("当日平仓明细:")
for t in trades[:15]:
lines.append(
f" - {t.get('symbol')} {t.get('direction')} {t.get('result')} "
f"{t.get('pnl_amount')}U @ {t.get('closed_at') or '?'}"
)
issues = ac.get("issues") or []
if issues:
lines.append("关注点:" + "".join(issues))
lines.append("")
return "\n".join(lines).strip()
def format_account_remark(ac: dict) -> str:
"""分户表格备注:持仓摘要或关注点。"""
positions = ac.get("positions") or []
if not positions:
issues = ac.get("issues") or []
if issues:
return "".join(str(x) for x in issues[:2])
return ""
parts: list[str] = []
for p in positions[:3]:
if not isinstance(p, dict):
continue
sym = p.get("symbol") or "?"
side = p.get("side") or "?"
contracts = p.get("contracts") if p.get("contracts") is not None else p.get("size")
upnl = _position_float_pnl(p)
parts.append(f"持仓: {sym} {side} 张数{contracts} 浮盈亏{upnl:.4f}U")
if len(positions) > 3:
parts.append(f"另有{len(positions) - 3}")
return "".join(parts) if parts else ""
def format_chat_context_brief(payload: dict, max_chars: int = 2500) -> str:
text = format_context_text(payload)
if len(text) <= max_chars:
return text
return text[: max_chars - 3].rstrip() + "..."