"""中控 AI:四户数据聚合为结构化上下文。""" from __future__ import annotations import hashlib import json import os import re import time from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta from threading import Lock from typing import Any, Optional import httpx from hub_ai.config import ( CHAT_CONTEXT_MAX_CHARS, FUND_HISTORY_DAYS, hub_agent_timeout, hub_flask_timeout, trading_day_reset_hour, ) from hub_ai.fund_history import format_fund_history_text, get_fund_history, record_fund_snapshot from hub_trades_lib import current_trading_day, summarize_trades _CHAT_CONTEXT_CACHE: dict[str, dict[str, Any]] = {} _CHAT_CONTEXT_CACHE_LOCK = Lock() _HUB_TPSL_MERGE_FN: Any = None def _chat_context_cache_ttl_sec() -> float: try: return float(os.getenv("CHAT_CONTEXT_CACHE_TTL_SEC", "45") or "45") except ValueError: return 45.0 def _hub_token() -> str: return (os.getenv("HUB_BRIDGE_TOKEN") or os.getenv("CONTROL_TOKEN") or "").strip() def _hub_headers() -> dict[str, str]: tok = _hub_token() return {"X-Hub-Token": tok} if tok else {} def _agent_headers() -> dict[str, str]: tok = (os.getenv("CONTROL_TOKEN") or os.getenv("HUB_BRIDGE_TOKEN") or "").strip() return {"X-Control-Token": tok} if tok else {} def _safe_float(v: Any) -> Optional[float]: try: if v is None or v == "": return None return float(v) except (TypeError, ValueError): return None def _position_contracts(p: dict) -> float: for key in ("contracts", "contracts_signed", "size"): v = p.get(key) try: if v is not None and v != "": return float(v) except (TypeError, ValueError): continue return 0.0 def _filter_open_positions(positions: list) -> list[dict]: out: list[dict] = [] for p in positions or []: if not isinstance(p, dict): continue if abs(_position_contracts(p)) < 1e-12: continue out.append(p) return out def _account_open_position_count(ac: dict) -> int: return len(_filter_open_positions(ac.get("positions") or [])) def _monitor_counts(ac: dict) -> dict[str, int]: mon = ac.get("monitor_lines") or {} return { "trends": len(mon.get("trends") or []), "rolls": len(mon.get("rolls") or []), "keys": len(mon.get("keys") or []), "orders": len(mon.get("orders") or []), } def _position_float_pnl(pos: dict) -> float: for key in ("unrealized_pnl", "unrealizedPnl", "upnl"): v = _safe_float(pos.get(key)) if v is not None: return v return 0.0 def _collect_open_issues( *, monitored: bool, agent_ok: bool, flask_ok: bool, positions: list, hub_mon: Optional[dict], day_pnl: float, ) -> list[str]: issues: list[str] = [] if not monitored: return issues if not agent_ok: issues.append("Agent 连接异常") if not flask_ok: issues.append("Flask 监控连接异常") if day_pnl < -0.01: issues.append(f"当日平仓亏损 {day_pnl:.2f}U") open_positions = _filter_open_positions(positions) float_pnl = sum(_position_float_pnl(p) for p in open_positions) if float_pnl < -0.5: issues.append(f"当前浮亏 {float_pnl:.2f}U") if isinstance(hub_mon, dict) and hub_mon.get("ok") is not False: orders = hub_mon.get("orders") or [] trends = hub_mon.get("trends") or [] if open_positions and not orders and not trends: issues.append("交易所有持仓但无本地 active 监控/趋势计划") return issues def previous_trading_day(trading_day: str) -> str: day = (trading_day or "").strip()[:10] if not day: return day dt = datetime.strptime(day, "%Y-%m-%d") return (dt - timedelta(days=1)).strftime("%Y-%m-%d") def _fmt_fund(v: Any) -> str: n = _safe_float(v) if n is None: return "未知" return f"{n:.2f}U" def _format_trade_line(t: dict, *, day_label: str = "") -> str: prefix = f"[{day_label}] " if day_label else "" return ( f"{prefix}{t.get('symbol')} {t.get('direction')} {t.get('result')} " f"{t.get('pnl_amount')}U @ {t.get('closed_at') or '?'}" ) def _monitor_label(item: dict, default: str = "") -> str: for key in ("monitor_type_label", "monitor_type", "entry_reason", "source_label"): val = item.get(key) if val: return str(val) return default def _format_monitor_sections(hub_mon: Optional[dict]) -> dict[str, list[str]]: out = {"trends": [], "orders": [], "keys": [], "rolls": []} if not isinstance(hub_mon, dict) or hub_mon.get("ok") is False: return out for t in hub_mon.get("trends") or []: if not isinstance(t, dict): continue out["trends"].append( f"{t.get('symbol')} {t.get('direction')} " f"SL={t.get('stop_loss')} TP={t.get('take_profit')} " f"补仓区[{t.get('add_lower')}~{t.get('add_upper')}] " f"状态={t.get('status')}" ) for o in hub_mon.get("orders") or []: if not isinstance(o, dict): continue label = _monitor_label(o, "下单监控") out["orders"].append( f"{label}: {o.get('symbol')} {o.get('direction')} " f"触发={o.get('trigger_price')} SL={o.get('stop_loss')} TP={o.get('take_profit')} " f"状态={o.get('status')}" ) for k in hub_mon.get("keys") or []: if not isinstance(k, dict): continue out["keys"].append( f"关键位: {k.get('symbol')} {k.get('direction')} " f"上={k.get('upper')} 下={k.get('lower')} 类型={k.get('monitor_type')}" ) for r in hub_mon.get("rolls") or []: if not isinstance(r, dict): continue out["rolls"].append( f"顺势加仓: {r.get('symbol')} {r.get('direction')} " f"腿数={r.get('leg_count')} SL={r.get('current_stop_loss') or r.get('initial_stop_loss')} " f"状态={r.get('status')}" ) return out _SL_TP_COMBO_RE = re.compile(r"SL=([\d.eE+-]+).*TP=([\d.eE+-]+)", re.I) def _norm_symbol(sym: str) -> str: s = (sym or "").strip().upper() if "/" in s: s = s.split(":")[0].split("/")[0] return s def _symbols_match(a: str, b: str) -> bool: na, nb = _norm_symbol(a), _norm_symbol(b) return bool(na and nb and na == nb) def _pick_tpsl_from_cond(cond: list) -> tuple[Optional[float], Optional[float]]: sl = tp = None if not cond: return sl, tp sl_o = tp_o = combo = None for o in cond: if not isinstance(o, dict): continue lbl = str(o.get("label") or "") if "止盈止损" in lbl: combo = o elif lbl.startswith("止损"): sl_o = o elif lbl.startswith("止盈"): tp_o = o if combo: lbl = str(combo.get("label") or "") m = _SL_TP_COMBO_RE.search(lbl) if m: sl = _safe_float(m.group(1)) tp = _safe_float(m.group(2)) if sl_o and sl is None: sl = _safe_float(sl_o.get("trigger_price")) if tp_o and tp is None: tp = _safe_float(tp_o.get("trigger_price")) if sl is None: for o in cond: if not isinstance(o, dict): continue lbl = str(o.get("label") or "") if "止损" in lbl and "止盈止损" not in lbl: sl = _safe_float(o.get("trigger_price")) if sl is not None: break if tp is None: for o in cond: if not isinstance(o, dict): continue lbl = str(o.get("label") or "") if lbl.startswith("止盈") or ("止盈" in lbl and "止盈止损" not in lbl): tp = _safe_float(o.get("trigger_price")) if tp is not None: break return sl, tp def _pick_tpsl_from_exchange_tpsl(et: Any) -> tuple[Optional[float], Optional[float]]: if not isinstance(et, dict): return None, None sl = tp = None slot_sl = et.get("sl") slot_tp = et.get("tp") if isinstance(slot_sl, dict): sl = _safe_float(slot_sl.get("trigger_price")) if isinstance(slot_tp, dict): tp = _safe_float(slot_tp.get("trigger_price")) return sl, tp def _find_plan_tpsl_for_position( symbol: str, side: str, hub_mon: Optional[dict], ) -> tuple[Optional[float], Optional[float], bool]: """匹配本地监控/趋势计划:sl, tp, tp_is_program_monitored。""" if not isinstance(hub_mon, dict): return None, None, False side_l = (side or "").lower() for o in hub_mon.get("orders") or []: if not isinstance(o, dict): continue o_sym = o.get("exchange_symbol") or o.get("symbol") or "" if not _symbols_match(symbol, o_sym): continue if (o.get("direction") or "").lower() != side_l: continue return ( _safe_float(o.get("stop_loss")), _safe_float(o.get("take_profit")), False, ) for t in hub_mon.get("trends") or []: if not isinstance(t, dict): continue if not _symbols_match(symbol, t.get("symbol") or ""): continue if (t.get("direction") or "").lower() != side_l: continue plan_tp = t.get("take_profit") tp = _safe_float(plan_tp) if plan_tp not in (None, "") else None return _safe_float(t.get("stop_loss")), tp, tp is None return None, None, False def _resolve_position_tpsl(pos: dict, hub_mon: Optional[dict]) -> dict[str, Any]: cond = pos.get("conditional_orders") or [] cond_sl, cond_tp = _pick_tpsl_from_cond(cond) et_sl, et_tp = _pick_tpsl_from_exchange_tpsl(pos.get("exchange_tpsl")) plan_sl, plan_tp, tp_monitored = _find_plan_tpsl_for_position( str(pos.get("symbol") or ""), str(pos.get("side") or ""), hub_mon, ) sl = cond_sl if cond_sl is not None else et_sl if et_sl is not None else plan_sl tp_note = "" tp: Optional[float] = None if tp_monitored and cond_tp is None and et_tp is None: tp_note = "程序监控" else: tp = cond_tp if cond_tp is not None else et_tp if et_tp is not None else plan_tp if sl is not None and tp is not None and sl == tp: tp = None return {"sl": sl, "tp": tp, "tp_note": tp_note} def _format_position_detail_line(pos: dict, hub_mon: Optional[dict]) -> str: sym = pos.get("symbol") or "?" side = pos.get("side") or "?" contracts = pos.get("contracts") or pos.get("size") or "?" upnl = _position_float_pnl(pos) entry = _safe_float(pos.get("entry_price")) tpsl = _resolve_position_tpsl(pos, hub_mon) parts = [f"{sym} {side} 张数{contracts}"] if entry is not None: parts.append(f"入场{entry:g}") if tpsl["sl"] is not None: parts.append(f"止损{tpsl['sl']:g}") else: parts.append("止损=未检测到") if tpsl["tp_note"]: parts.append(f"止盈={tpsl['tp_note']}") elif tpsl["tp"] is not None: parts.append(f"止盈{tpsl['tp']:g}") else: parts.append("止盈=未检测到") parts.append(f"浮盈亏{upnl:.4f}U") return " - " + " ".join(parts) def _enrich_positions_exchange_tpsl( positions: list, price_snap: Optional[dict], hub_mon: Optional[dict], ) -> None: global _HUB_TPSL_MERGE_FN if not positions: return if _HUB_TPSL_MERGE_FN is None: try: from hub import _merge_flask_exchange_tpsl _HUB_TPSL_MERGE_FN = _merge_flask_exchange_tpsl except Exception: _HUB_TPSL_MERGE_FN = False if not _HUB_TPSL_MERGE_FN: return try: _HUB_TPSL_MERGE_FN( {"agent": {"positions": positions}}, price_snap if isinstance(price_snap, dict) else None, hub_mon if isinstance(hub_mon, dict) else None, ) except Exception: pass def _fetch_account_bundle( client: httpx.Client, ex: dict, trading_day: str, *, for_chat: bool = False, ) -> dict[str, Any]: name = ex.get("name") or ex.get("key") or ex.get("id") key = ex.get("key") or "" enabled = bool(ex.get("enabled")) env_disabled = bool(ex.get("env_disabled")) monitored = enabled and not env_disabled base: dict[str, Any] = { "id": ex.get("id"), "key": key, "name": name, "enabled": enabled, "env_disabled": env_disabled, "status": "未监控" if not monitored else "已监控", "trades": [], "trade_stats": summarize_trades([]), "positions": [], "open_position_count": 0, "float_pnl_u": 0.0, "balance_usdt": None, "funding_usdt": None, "trading_usdt": None, "available_trading_usdt": None, "trades_yesterday": [], "trade_stats_yesterday": summarize_trades([]), "monitor_lines": {"trends": [], "orders": [], "keys": [], "rolls": []}, "issues": [], "agent_ok": False, "flask_ok": False, "hub_monitor": None, "active_orders": 0, "active_trends": 0, } if not monitored: base["issues"] = [] return base agent_url = (ex.get("agent_url") or "").rstrip("/") flask_url = (ex.get("flask_url") or "").rstrip("/") agent_body = None if agent_url: try: r = client.get( f"{agent_url}/status", headers=_agent_headers(), timeout=hub_agent_timeout(), ) if r.status_code == 200: agent_body = r.json() base["agent_ok"] = True except Exception as exc: base["issues"].append(f"Agent: {exc}") if isinstance(agent_body, dict): base["balance_usdt"] = _safe_float(agent_body.get("balance_usdt")) positions = agent_body.get("positions") or [] if isinstance(positions, list): open_positions = _filter_open_positions(positions) base["positions"] = open_positions base["open_position_count"] = len(open_positions) base["float_pnl_u"] = round(sum(_position_float_pnl(p) for p in open_positions), 4) hub_mon = None price_snap = None prev_day = previous_trading_day(trading_day) if flask_url: try: r = client.get( f"{flask_url}/api/hub/account", headers=_hub_headers(), timeout=hub_flask_timeout(), ) if r.status_code == 200: acct_body = r.json() if isinstance(acct_body, dict) and acct_body.get("ok"): base["funding_usdt"] = _safe_float(acct_body.get("funding_usdt")) base["trading_usdt"] = _safe_float(acct_body.get("trading_usdt")) base["available_trading_usdt"] = _safe_float(acct_body.get("available_trading_usdt")) base["flask_ok"] = True except Exception as exc: base["issues"].append(f"资金接口: {exc}") try: r = client.get( f"{flask_url}/api/hub/trades/today", headers=_hub_headers(), params={"trading_day": trading_day}, timeout=hub_flask_timeout(), ) if r.status_code == 200: trades_body = r.json() if isinstance(trades_body, dict) and trades_body.get("ok"): base["trades"] = trades_body.get("trades") or [] base["trade_stats"] = trades_body.get("stats") or summarize_trades(base["trades"]) base["flask_ok"] = True except Exception as exc: base["issues"].append(f"成交接口: {exc}") if prev_day and not for_chat: try: r = client.get( f"{flask_url}/api/hub/trades/today", headers=_hub_headers(), params={"trading_day": prev_day}, timeout=hub_flask_timeout(), ) if r.status_code == 200: y_body = r.json() if isinstance(y_body, dict) and y_body.get("ok"): base["trades_yesterday"] = y_body.get("trades") or [] base["trade_stats_yesterday"] = y_body.get("stats") or summarize_trades( base["trades_yesterday"] ) base["flask_ok"] = True except Exception as exc: base["issues"].append(f"昨日成交: {exc}") try: r = client.get( f"{flask_url}/api/hub/monitor", headers=_hub_headers(), timeout=hub_flask_timeout(), ) if r.status_code == 200: hub_mon = r.json() if isinstance(hub_mon, dict) and hub_mon.get("ok") is not False: base["hub_monitor"] = hub_mon base["flask_ok"] = True base["active_orders"] = len(hub_mon.get("orders") or []) base["active_trends"] = len(hub_mon.get("trends") or []) base["monitor_lines"] = _format_monitor_sections(hub_mon) except Exception as exc: if "成交接口" not in str(base["issues"]): base["issues"].append(f"监控接口: {exc}") try: r = client.get( f"{flask_url}/api/price_snapshot", headers=_hub_headers(), timeout=hub_flask_timeout(), ) if r.status_code == 200: body = r.json() if isinstance(body, dict): price_snap = body base["flask_ok"] = True except Exception: pass if base["positions"]: _enrich_positions_exchange_tpsl(base["positions"], price_snap, hub_mon) if monitored and not base["agent_ok"] and not base["flask_ok"]: base["status"] = "连接异常" elif base["issues"]: base["status"] = "已监控·需关注" day_pnl = float((base.get("trade_stats") or {}).get("total_pnl_u") or 0) base["issues"].extend( _collect_open_issues( monitored=monitored, agent_ok=base["agent_ok"], flask_ok=base["flask_ok"], positions=base["positions"], hub_mon=hub_mon if isinstance(hub_mon, dict) else None, day_pnl=day_pnl, ) ) base["issues"] = list(dict.fromkeys(base["issues"])) return base def _fetch_account_bundle_isolated(ex: dict, trading_day: str, *, for_chat: bool) -> dict[str, Any]: with httpx.Client() as client: return _fetch_account_bundle(client, ex, trading_day, for_chat=for_chat) def build_daily_context( exchanges: list[dict], *, trading_day: Optional[str] = None, for_chat: bool = False, ) -> dict[str, Any]: day = (trading_day or "").strip()[:10] or current_trading_day( reset_hour=trading_day_reset_hour() ) ex_list = exchanges or [] if for_chat and len(ex_list) > 1: workers = min(4, len(ex_list)) with ThreadPoolExecutor(max_workers=workers) as pool: accounts = list( pool.map( lambda ex: _fetch_account_bundle_isolated(ex, day, for_chat=True), ex_list, ) ) else: with httpx.Client() as client: accounts = [ _fetch_account_bundle(client, ex, day, for_chat=for_chat) for ex in ex_list ] total_closed_pnl = 0.0 total_closed = total_win = total_loss = 0 total_float = 0.0 total_funding = 0.0 total_trading = 0.0 total_open_positions = 0 funding_known = trading_known = 0 for ac in accounts: if ac.get("status") == "未监控": continue st = ac.get("trade_stats") or {} total_closed_pnl += float(st.get("total_pnl_u") or 0) total_closed += int(st.get("closed_count") or 0) total_win += int(st.get("win_count") or 0) total_loss += int(st.get("loss_count") or 0) total_float += float(ac.get("float_pnl_u") or 0) total_open_positions += int(ac.get("open_position_count") or _account_open_position_count(ac)) fu = _safe_float(ac.get("funding_usdt")) tu = _safe_float(ac.get("trading_usdt")) if fu is not None: total_funding += fu funding_known += 1 if tu is not None: total_trading += tu trading_known += 1 if not funding_known: total_funding = None if not trading_known: total_trading = None totals = { "trading_day": day, "prev_trading_day": previous_trading_day(day), "total_pnl_u": round(total_closed_pnl, 4), "closed_count": total_closed, "win_count": total_win, "loss_count": total_loss, "float_pnl_u": round(total_float, 4), "open_position_count": total_open_positions, "total_funding_usdt": round(total_funding, 4) if total_funding is not None else None, "total_trading_usdt": round(total_trading, 4) if total_trading is not None else None, } if for_chat: fund_history: list = [] fund_history_text = "" else: snap_accounts = [ { **ac, "monitored": ac.get("status") != "未监控", } for ac in accounts ] record_fund_snapshot(day, snap_accounts, keep_days=FUND_HISTORY_DAYS) fund_history = get_fund_history(anchor_day=day, keep_days=FUND_HISTORY_DAYS) account_names = {str(ac.get("key") or ac.get("id")): ac.get("name") for ac in accounts} fund_history_text = format_fund_history_text(fund_history, account_names=account_names) payload = { "trading_day": day, "prev_trading_day": previous_trading_day(day), "totals": totals, "accounts": accounts, "fund_history": fund_history, "fund_history_text": fund_history_text, } if for_chat: text = format_chat_context_for_chat(payload) else: text = format_context_text(payload) digest = hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] return { "trading_day": day, "prev_trading_day": previous_trading_day(day), "totals": totals, "accounts": accounts, "fund_history": fund_history, "fund_history_text": fund_history_text, "text": text, "context_hash": digest, } def build_chat_context( exchanges: list[dict], *, trading_day: Optional[str] = None, force_refresh: bool = False, ) -> dict[str, Any]: """聊天专用上下文:并行拉取、跳过资金曲线/昨日成交,短 TTL 缓存。""" day = (trading_day or "").strip()[:10] or current_trading_day( reset_hour=trading_day_reset_hour() ) ttl = _chat_context_cache_ttl_sec() now = time.monotonic() if not force_refresh and ttl > 0: with _CHAT_CONTEXT_CACHE_LOCK: hit = _CHAT_CONTEXT_CACHE.get(day) if hit and (now - float(hit.get("ts") or 0)) < ttl: return hit["ctx"] ctx = build_daily_context(exchanges, trading_day=day, for_chat=True) if ttl > 0: with _CHAT_CONTEXT_CACHE_LOCK: _CHAT_CONTEXT_CACHE[day] = {"ts": now, "ctx": ctx} return ctx def format_context_text(payload: dict) -> str: lines = [] totals = payload.get("totals") or {} day = totals.get("trading_day") prev_day = totals.get("prev_trading_day") or previous_trading_day(str(day or "")) lines.append( f"【合计·今日 {day}】平仓盈亏 {totals.get('total_pnl_u')}U | " f"笔数 {totals.get('closed_count')}(胜{totals.get('win_count')}/负{totals.get('loss_count')})| " f"实盘持仓 {totals.get('open_position_count', 0)} 仓 | " f"浮盈亏 {totals.get('float_pnl_u')}U | " f"资金账户合计 {_fmt_fund(totals.get('total_funding_usdt'))} | " f"交易账户合计 {_fmt_fund(totals.get('total_trading_usdt'))}" ) lines.append( f"【对比交易日】昨日={prev_day},今日={day}。" "「持仓」= 交易所 Agent 实盘;「趋势/关键位/监控单/加仓」= 本地计划,不等于已开仓。" ) fund_txt = str(payload.get("fund_history_text") or "").strip() if fund_txt: lines.append("") lines.append(fund_txt) lines.append("") for ac in payload.get("accounts") or []: st = ac.get("trade_stats") or {} sty = ac.get("trade_stats_yesterday") or {} lines.append(f"--- 账户:{ac.get('name')} ({ac.get('key')}) ---") lines.append(f"状态:{ac.get('status')}") if ac.get("status") == "未监控": lines.append("") continue lines.append( f"资金账户 {_fmt_fund(ac.get('funding_usdt'))} | " f"交易账户 {_fmt_fund(ac.get('trading_usdt'))} | " f"可用 {_fmt_fund(ac.get('available_trading_usdt'))}" ) lines.append( f"今日({day})平仓:{st.get('closed_count')} 笔,盈亏 {st.get('total_pnl_u')}U " f"(胜{st.get('win_count')}/负{st.get('loss_count')})" ) lines.append( f"昨日({prev_day})平仓:{sty.get('closed_count')} 笔,盈亏 {sty.get('total_pnl_u')}U " f"(胜{sty.get('win_count')}/负{sty.get('loss_count')})" ) open_n = int(ac.get("open_position_count") or _account_open_position_count(ac)) if open_n <= 0: lines.append("当前交易所持仓:无(空仓)") else: lines.append( f"当前交易所持仓:{open_n} 仓 | 浮盈亏合计 {ac.get('float_pnl_u')}U" ) mon = ac.get("monitor_lines") or {} if mon.get("trends"): lines.append("趋势回调计划(本地,非持仓):") for row in mon["trends"][:8]: lines.append(f" - {row}") if mon.get("rolls"): lines.append("顺势加仓(本地,非持仓):") for row in mon["rolls"][:8]: lines.append(f" - {row}") if mon.get("keys"): lines.append("关键位监控(本地,非持仓):") for row in mon["keys"][:8]: lines.append(f" - {row}") if mon.get("orders"): lines.append("进行中的下单监控(本地,非持仓):") for row in mon["orders"][:8]: lines.append(f" - {row}") positions = ac.get("positions") or [] hub_mon = ac.get("hub_monitor") if positions: lines.append("持仓明细(交易所实盘,含止盈止损若已挂):") for p in positions[:8]: if not isinstance(p, dict): continue lines.append(_format_position_detail_line(p, hub_mon)) lines.append( f"Agent合约余额:{ac.get('balance_usdt') if ac.get('balance_usdt') is not None else '未知'} USDT" ) trades_today = ac.get("trades") or [] if trades_today: lines.append(f"今日平仓明细:") for t in trades_today[:15]: lines.append(f" - {_format_trade_line(t)}") trades_y = ac.get("trades_yesterday") or [] if trades_y: lines.append(f"昨日平仓明细:") for t in trades_y[:15]: lines.append(f" - {_format_trade_line(t)}") if not trades_today and not trades_y: lines.append("平仓明细:无") issues = ac.get("issues") or [] if issues: lines.append("关注点:" + ";".join(issues)) lines.append("") return "\n".join(lines).strip() def format_summary_context_text(payload: dict) -> str: """今日总结专用:仅当日平仓/持仓/监控,不含昨日明细与资金走势。""" lines = [] totals = payload.get("totals") or {} day = totals.get("trading_day") lines.append( f"【合计·今日 {day}】平仓盈亏 {totals.get('total_pnl_u')}U | " f"笔数 {totals.get('closed_count')}(胜{totals.get('win_count')}/负{totals.get('loss_count')})| " f"实盘持仓 {totals.get('open_position_count', 0)} 仓 | " f"浮盈亏 {totals.get('float_pnl_u')}U | " f"资金账户合计 {_fmt_fund(totals.get('total_funding_usdt'))} | " f"交易账户合计 {_fmt_fund(totals.get('total_trading_usdt'))}" ) lines.append( f"【说明】交易日={day}。" "「持仓」= 交易所 Agent 实盘;「趋势/关键位/监控单/加仓」= 本地计划,不等于已开仓。" ) lines.append("") for ac in payload.get("accounts") or []: st = ac.get("trade_stats") or {} lines.append(f"--- 账户:{ac.get('name')} ({ac.get('key')}) ---") lines.append(f"状态:{ac.get('status')}") if ac.get("status") == "未监控": lines.append("") continue lines.append( f"资金账户 {_fmt_fund(ac.get('funding_usdt'))} | " f"交易账户 {_fmt_fund(ac.get('trading_usdt'))} | " f"可用 {_fmt_fund(ac.get('available_trading_usdt'))}" ) lines.append( f"今日({day})平仓:{st.get('closed_count')} 笔,盈亏 {st.get('total_pnl_u')}U " f"(胜{st.get('win_count')}/负{st.get('loss_count')})" ) open_n = int(ac.get("open_position_count") or _account_open_position_count(ac)) if open_n <= 0: lines.append("当前交易所持仓:无(空仓)") else: lines.append( f"当前交易所持仓:{open_n} 仓 | 浮盈亏合计 {ac.get('float_pnl_u')}U" ) mon = ac.get("monitor_lines") or {} if mon.get("trends"): lines.append("趋势回调计划(本地,非持仓):") for row in mon["trends"][:8]: lines.append(f" - {row}") if mon.get("rolls"): lines.append("顺势加仓(本地,非持仓):") for row in mon["rolls"][:8]: lines.append(f" - {row}") if mon.get("keys"): lines.append("关键位监控(本地,非持仓):") for row in mon["keys"][:8]: lines.append(f" - {row}") if mon.get("orders"): lines.append("进行中的下单监控(本地,非持仓):") for row in mon["orders"][:8]: lines.append(f" - {row}") positions = ac.get("positions") or [] hub_mon = ac.get("hub_monitor") if positions: lines.append("持仓明细(交易所实盘,含止盈止损若已挂):") for p in positions[:8]: if not isinstance(p, dict): continue lines.append(_format_position_detail_line(p, hub_mon)) lines.append( f"Agent合约余额:{ac.get('balance_usdt') if ac.get('balance_usdt') is not None else '未知'} USDT" ) trades_today = ac.get("trades") or [] if trades_today: lines.append("今日平仓明细:") for t in trades_today[:15]: lines.append(f" - {_format_trade_line(t)}") else: lines.append("今日平仓明细:无") issues = ac.get("issues") or [] if issues: lines.append("关注点:" + ";".join(issues)) lines.append("") return "\n".join(lines).strip() def summary_context_hash(payload: dict) -> str: text = format_summary_context_text(payload) return hashlib.sha256(text.encode("utf-8")).hexdigest()[:16] def format_account_remark(ac: dict) -> str: """分户表格备注:监控摘要 + 持仓。""" parts: list[str] = [] mon = ac.get("monitor_lines") or {} if mon.get("trends"): parts.append(f"趋势{len(mon['trends'])}") if mon.get("rolls"): parts.append(f"加仓{len(mon['rolls'])}") if mon.get("keys"): parts.append(f"关键位{len(mon['keys'])}") if mon.get("orders"): parts.append(f"监控单{len(mon['orders'])}") positions = ac.get("positions") or [] if positions: for p in positions[:2]: if not isinstance(p, dict): continue sym = p.get("symbol") or "?" side = p.get("side") or "?" upnl = _position_float_pnl(p) parts.append(f"{sym} {side} 浮{upnl:.2f}U") if len(positions) > 2: parts.append(f"+{len(positions) - 2}仓") if not parts: issues = ac.get("issues") or [] if issues: return ";".join(str(x) for x in issues[:2]) return "无" return ";".join(parts) def format_dashboard_account_detail(ac: dict) -> dict[str, Any]: """数据看板分户卡片:监控仅数量,持仓逐行(含浮盈亏)。""" mon = ac.get("monitor_lines") or {} position_lines: list[dict[str, Any]] = [] for p in _filter_open_positions(ac.get("positions") or []): sym = p.get("symbol") or "?" side = p.get("side") or "?" upnl = _position_float_pnl(p) position_lines.append( { "kind": "position", "text": f"{sym} {side}", "pnl": round(upnl, 4), } ) issues = [str(x) for x in (ac.get("issues") or [])[:3]] return { "monitor_counts": { "keys": len(mon.get("keys") or []), "orders": len(mon.get("orders") or []), "trends": len(mon.get("trends") or []), "rolls": len(mon.get("rolls") or []), }, "position_lines": position_lines, "issues": issues, } def collect_closed_trades_snapshot( accounts: list[dict], *, today: str, yesterday: str | None = None, ) -> list[dict]: rows: list[dict] = [] for ac in accounts or []: name = ac.get("name") or ac.get("key") if yesterday: for t in ac.get("trades_yesterday") or []: if not isinstance(t, dict): continue rows.append({**t, "account_name": name, "trading_day": yesterday}) for t in ac.get("trades") or []: if not isinstance(t, dict): continue rows.append({**t, "account_name": name, "trading_day": today}) rows.sort(key=lambda x: str(x.get("closed_at") or x.get("opened_at") or ""), reverse=True) return rows[:80] def format_chat_position_overview(payload: dict) -> str: totals = payload.get("totals") or {} total_open = int(totals.get("open_position_count") or 0) if total_open <= 0: head = f"【实盘持仓总览】当前空仓(监控户合计 0 仓)。浮盈亏 0U 表示无持仓,不是「有仓但不动」。" else: head = ( f"【实盘持仓总览】监控户合计 {total_open} 仓," f"浮盈亏合计 {totals.get('float_pnl_u')}U。" ) lines = [ head, "【区分】只有带「持仓明细/交易所实盘」字样的才是已开仓;趋势回调、关键位、下单监控、顺势加仓是本地计划/监控,不算持仓。持仓明细若含止损/止盈价,表示已挂条件单或监控计划中有价位。", ] for ac in payload.get("accounts") or []: if ac.get("status") == "未监控": continue n = int(ac.get("open_position_count") or _account_open_position_count(ac)) mc = _monitor_counts(ac) mon_parts = [] if mc["trends"]: mon_parts.append(f"趋势{mc['trends']}") if mc["rolls"]: mon_parts.append(f"加仓{mc['rolls']}") if mc["keys"]: mon_parts.append(f"关键位{mc['keys']}") if mc["orders"]: mon_parts.append(f"监控单{mc['orders']}") mon_txt = f";本地监控 {' '.join(mon_parts)}" if mon_parts else "" if n <= 0: lines.append(f"- {ac.get('name')}:空仓{mon_txt}") else: lines.append( f"- {ac.get('name')}:{n}仓 浮盈亏{ac.get('float_pnl_u')}U{mon_txt}" ) return "\n".join(lines) def format_chat_context_slim(payload: dict) -> str: """聊天专用:不含 180 日资金曲线与昨日平仓明细,避免挤占对话上下文。""" totals = payload.get("totals") or {} day = totals.get("trading_day") lines = [ f"【今日合计 {day}】平仓盈亏 {totals.get('total_pnl_u')}U | " f"笔数 {totals.get('closed_count')}(胜{totals.get('win_count')}/负{totals.get('loss_count')})| " f"实盘持仓 {totals.get('open_position_count', 0)} 仓 | 浮盈亏 {totals.get('float_pnl_u')}U", "【说明】持仓=交易所实盘;趋势/关键位/监控单=本地计划,不等于已开仓。持仓行内「止损/止盈」= 交易所条件单或监控计划价(与监控页一致)。", ] for ac in payload.get("accounts") or []: if ac.get("status") == "未监控": lines.append(f"- {ac.get('name')}:未监控") continue st = ac.get("trade_stats") or {} open_n = int(ac.get("open_position_count") or _account_open_position_count(ac)) pos_txt = "空仓" if open_n <= 0 else f"{open_n}仓 浮盈亏{ac.get('float_pnl_u')}U" mc = _monitor_counts(ac) mon = [] if mc["trends"]: mon.append(f"趋势{mc['trends']}") if mc["rolls"]: mon.append(f"加仓{mc['rolls']}") if mc["keys"]: mon.append(f"关键位{mc['keys']}") if mc["orders"]: mon.append(f"监控单{mc['orders']}") mon_txt = f";监控 {'/'.join(mon)}" if mon else "" lines.append( f"- {ac.get('name')}:{pos_txt} | 今日盈亏{st.get('total_pnl_u')}U " f"({st.get('closed_count')}笔) | 资金{_fmt_fund(ac.get('funding_usdt'))} " f"交易{_fmt_fund(ac.get('trading_usdt'))}{mon_txt}" ) trades = ac.get("trades") or [] if trades: for t in trades[:4]: lines.append(f" · {_format_trade_line(t)}") if len(trades) > 4: lines.append(f" · …共{len(trades)}笔今日平仓") positions = ac.get("positions") or [] hub_mon = ac.get("hub_monitor") for p in positions[:4]: if not isinstance(p, dict): continue lines.append(f" · {_format_position_detail_line(p, hub_mon).lstrip(' - ')}") return "\n".join(lines) def format_chat_context_for_chat( payload: dict, max_chars: int = CHAT_CONTEXT_MAX_CHARS, ) -> str: overview = format_chat_position_overview(payload) body = format_chat_context_slim(payload) text = overview + "\n\n" + body if len(text) <= max_chars: return text budget = max(2000, max_chars - len(overview) - 4) return overview + "\n\n" + body[:budget].rstrip() + "…" def format_chat_context_brief( payload: dict, max_chars: int = CHAT_CONTEXT_MAX_CHARS, ) -> str: return format_chat_context_for_chat(payload, max_chars=max_chars)