feat(hub): enrich AI coach with fund history, closed trades, and chat uploads

- Add 15-day fund snapshot store and /api/hub/account on all instances

- Summary includes yesterday/today trades, fund columns, and section 5 操作建议

- Chat context distinguishes empty positions from local monitors

- Support image/document attachments in AI chat

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-07 08:54:20 +08:00
parent 51c59b073b
commit 62e48dab92
19 changed files with 947 additions and 106 deletions
+101
View File
@@ -0,0 +1,101 @@
"""中控 AI 聊天附件解析。"""
from __future__ import annotations
import base64
from typing import Any
from hub_ai.config import (
CHAT_MAX_ATTACHMENTS,
CHAT_MAX_IMAGE_BYTES,
CHAT_MAX_TEXT_FILE_BYTES,
)
IMAGE_MIMES = {
"image/jpeg",
"image/jpg",
"image/png",
"image/webp",
"image/gif",
}
TEXT_MIMES = {
"text/plain",
"text/markdown",
"application/json",
}
def _guess_mime(filename: str, content_type: str) -> str:
ct = (content_type or "").split(";")[0].strip().lower()
if ct:
return ct
name = (filename or "").lower()
if name.endswith(".png"):
return "image/png"
if name.endswith((".jpg", ".jpeg")):
return "image/jpeg"
if name.endswith(".webp"):
return "image/webp"
if name.endswith(".gif"):
return "image/gif"
if name.endswith((".md", ".markdown")):
return "text/markdown"
if name.endswith(".txt"):
return "text/plain"
if name.endswith(".json"):
return "application/json"
return "application/octet-stream"
def parse_chat_attachments(raw_files: list[dict[str, Any]]) -> dict[str, Any]:
"""
raw_files: [{filename, content_type, data: bytes}]
返回 images_b64, attachment_note, attachment_meta, text_append
"""
images_b64: list[str] = []
meta: list[dict] = []
notes: list[str] = []
text_blocks: list[str] = []
errors: list[str] = []
for item in (raw_files or [])[:CHAT_MAX_ATTACHMENTS]:
name = str(item.get("filename") or "file")
data = item.get("data") or b""
if not isinstance(data, (bytes, bytearray)):
errors.append(f"{name}: 无效数据")
continue
mime = _guess_mime(name, str(item.get("content_type") or ""))
size = len(data)
if mime in IMAGE_MIMES:
if size > CHAT_MAX_IMAGE_BYTES:
errors.append(f"{name}: 图片超过 {CHAT_MAX_IMAGE_BYTES // 1024 // 1024}MB")
continue
images_b64.append(base64.b64encode(bytes(data)).decode("ascii"))
meta.append({"name": name, "kind": "image", "mime": mime, "size": size})
notes.append(f"图片 {name}")
continue
if mime in TEXT_MIMES or name.lower().endswith((".txt", ".md", ".markdown", ".json")):
if size > CHAT_MAX_TEXT_FILE_BYTES:
errors.append(f"{name}: 文本超过 {CHAT_MAX_TEXT_FILE_BYTES // 1024}KB")
continue
try:
text = bytes(data).decode("utf-8")
except UnicodeDecodeError:
errors.append(f"{name}: 非 UTF-8 文本")
continue
text_blocks.append(f"--- 附件 {name} ---\n{text.strip()}")
meta.append({"name": name, "kind": "text", "mime": mime, "size": size})
notes.append(f"文档 {name}")
continue
errors.append(f"{name}: 不支持的类型(仅图片或 txt/md/json")
attachment_note = "".join(notes) if notes else ""
if errors:
attachment_note = (attachment_note + "" if attachment_note else "") + "".join(errors)
text_append = "\n\n".join(text_blocks)
return {
"images_b64": images_b64,
"attachment_note": attachment_note,
"attachment_meta": meta,
"text_append": text_append,
"errors": errors,
}
+34 -7
View File
@@ -1,11 +1,12 @@
"""中控 AI:单会话聊天(直到用户点击新开)。"""
from __future__ import annotations
from typing import Any
from typing import Any, Optional
from hub_ai.attachments import parse_chat_attachments
from hub_ai.client import generate_text, model_label
from hub_ai.config import CHAT_MAX_HISTORY_TURNS, CHAT_TEMPERATURE
from hub_ai.context import build_daily_context, format_chat_context_brief
from hub_ai.context import build_daily_context, format_chat_context_for_chat
from hub_ai.prompts import CHAT_SYSTEM, build_chat_user_prompt
from hub_ai.store import (
append_chat_message,
@@ -23,7 +24,12 @@ def _history_lines(messages: list[dict], max_turns: int = CHAT_MAX_HISTORY_TURNS
lines = []
for m in rows:
role = "用户" if m.get("role") == "user" else "搭档"
lines.append(f"{role}{m.get('content') or ''}")
content = m.get("content") or ""
att = m.get("attachments") or []
if att:
names = "".join(str(a.get("name") or "附件") for a in att[:3])
content = f"{content} [附件: {names}]".strip()
lines.append(f"{role}{content}")
return "\n".join(lines)
@@ -47,20 +53,35 @@ def send_chat_message(
message: str,
*,
trading_day: str | None = None,
raw_attachments: Optional[list[dict]] = None,
) -> dict[str, Any]:
text = (message or "").strip()
if not text:
parsed = parse_chat_attachments(raw_attachments or [])
if parsed.get("errors") and not text and not parsed.get("images_b64"):
return {"ok": False, "msg": "".join(parsed["errors"])}
if not text and not parsed.get("images_b64") and not parsed.get("text_append"):
return {"ok": False, "msg": "消息不能为空"}
user_visible = text
if parsed.get("text_append"):
user_visible = (user_visible + "\n\n" + parsed["text_append"]).strip()
if not user_visible and parsed.get("attachment_note"):
user_visible = f"(上传了 {parsed['attachment_note']}"
ctx = build_daily_context(exchanges, trading_day=trading_day)
day = ctx["trading_day"]
session = ensure_active_session(trading_day=day)
sid = session["id"]
history = _history_lines(session.get("messages") or [])
append_chat_message(sid, "user", text)
append_chat_message(
sid,
"user",
user_visible,
attachments=parsed.get("attachment_meta") or [],
)
brief_ctx = format_chat_context_brief(ctx)
brief_ctx = format_chat_context_for_chat(ctx)
excerpt = summary_excerpt_for_chat(day)
user_prompt = build_chat_user_prompt(
@@ -68,12 +89,17 @@ def send_chat_message(
trading_day=day,
summary_excerpt=excerpt,
history_lines=history,
user_message=text,
user_message=text or user_visible,
attachment_note=str(parsed.get("attachment_note") or ""),
)
if parsed.get("text_append"):
user_prompt += "\n\n【附件正文】\n" + parsed["text_append"]
reply = generate_text(
system=CHAT_SYSTEM,
user=user_prompt,
temperature=CHAT_TEMPERATURE,
images_b64=parsed.get("images_b64") or None,
)
if reply.startswith("AI 调用失败"):
return {"ok": False, "msg": reply, "session_id": sid}
@@ -85,4 +111,5 @@ def send_chat_message(
"session": session,
"reply": reply,
"model": model_label(),
"attachment_warnings": parsed.get("errors") or [],
}
+9 -2
View File
@@ -3,6 +3,7 @@ from __future__ import annotations
import sys
from pathlib import Path
from typing import Optional, Sequence
_REPO_ROOT = Path(__file__).resolve().parents[2]
if str(_REPO_ROOT) not in sys.path:
@@ -15,6 +16,12 @@ def model_label() -> str:
return ai_provider_label()
def generate_text(*, system: str, user: str, temperature: float) -> str:
def generate_text(
*,
system: str,
user: str,
temperature: float,
images_b64: Optional[Sequence[str]] = None,
) -> str:
prompt = f"{system.strip()}\n\n---\n\n{user.strip()}"
return ai_generate(prompt, temperature=temperature)
return ai_generate(prompt, temperature=temperature, images_b64=images_b64)
+4
View File
@@ -10,6 +10,10 @@ CHAT_TEMPERATURE = 0.5
CHAT_MAX_HISTORY_TURNS = 20
SUMMARY_RETENTION_DAYS = 90
CHAT_SESSION_RETENTION_DAYS = 60
FUND_HISTORY_DAYS = 15
CHAT_MAX_ATTACHMENTS = 3
CHAT_MAX_IMAGE_BYTES = 4 * 1024 * 1024
CHAT_MAX_TEXT_FILE_BYTES = 200 * 1024
def trading_day_reset_hour() -> int:
+357 -41
View File
@@ -4,11 +4,13 @@ from __future__ import annotations
import hashlib
import json
import os
from typing import Any, Callable, Optional
from datetime import datetime, timedelta
from typing import Any, Optional
import httpx
from hub_ai.config import hub_agent_timeout, hub_flask_timeout, trading_day_reset_hour
from hub_ai.config import FUND_HISTORY_DAYS, hub_agent_timeout, hub_flask_timeout, trading_day_reset_hour
from hub_ai.fund_history import format_fund_history_text, get_fund_history, record_fund_snapshot
from hub_trades_lib import current_trading_day, summarize_trades
@@ -35,6 +37,42 @@ def _safe_float(v: Any) -> Optional[float]:
return None
def _position_contracts(p: dict) -> float:
for key in ("contracts", "contracts_signed", "size"):
v = p.get(key)
try:
if v is not None and v != "":
return float(v)
except (TypeError, ValueError):
continue
return 0.0
def _filter_open_positions(positions: list) -> list[dict]:
out: list[dict] = []
for p in positions or []:
if not isinstance(p, dict):
continue
if abs(_position_contracts(p)) < 1e-12:
continue
out.append(p)
return out
def _account_open_position_count(ac: dict) -> int:
return len(_filter_open_positions(ac.get("positions") or []))
def _monitor_counts(ac: dict) -> dict[str, int]:
mon = ac.get("monitor_lines") or {}
return {
"trends": len(mon.get("trends") or []),
"rolls": len(mon.get("rolls") or []),
"keys": len(mon.get("keys") or []),
"orders": len(mon.get("orders") or []),
}
def _position_float_pnl(pos: dict) -> float:
for key in ("unrealized_pnl", "unrealizedPnl", "upnl"):
v = _safe_float(pos.get(key))
@@ -61,17 +99,89 @@ def _collect_open_issues(
issues.append("Flask 监控连接异常")
if day_pnl < -0.01:
issues.append(f"当日平仓亏损 {day_pnl:.2f}U")
float_pnl = sum(_position_float_pnl(p) for p in positions if isinstance(p, dict))
open_positions = _filter_open_positions(positions)
float_pnl = sum(_position_float_pnl(p) for p in open_positions)
if float_pnl < -0.5:
issues.append(f"当前浮亏 {float_pnl:.2f}U")
if isinstance(hub_mon, dict) and hub_mon.get("ok") is not False:
orders = hub_mon.get("orders") or []
trends = hub_mon.get("trends") or []
if positions and not orders and not trends:
if open_positions and not orders and not trends:
issues.append("交易所有持仓但无本地 active 监控/趋势计划")
return issues
def previous_trading_day(trading_day: str) -> str:
day = (trading_day or "").strip()[:10]
if not day:
return day
dt = datetime.strptime(day, "%Y-%m-%d")
return (dt - timedelta(days=1)).strftime("%Y-%m-%d")
def _fmt_fund(v: Any) -> str:
n = _safe_float(v)
if n is None:
return "未知"
return f"{n:.2f}U"
def _format_trade_line(t: dict, *, day_label: str = "") -> str:
prefix = f"[{day_label}] " if day_label else ""
return (
f"{prefix}{t.get('symbol')} {t.get('direction')} {t.get('result')} "
f"{t.get('pnl_amount')}U @ {t.get('closed_at') or '?'}"
)
def _monitor_label(item: dict, default: str = "") -> str:
for key in ("monitor_type_label", "monitor_type", "entry_reason", "source_label"):
val = item.get(key)
if val:
return str(val)
return default
def _format_monitor_sections(hub_mon: Optional[dict]) -> dict[str, list[str]]:
out = {"trends": [], "orders": [], "keys": [], "rolls": []}
if not isinstance(hub_mon, dict) or hub_mon.get("ok") is False:
return out
for t in hub_mon.get("trends") or []:
if not isinstance(t, dict):
continue
out["trends"].append(
f"{t.get('symbol')} {t.get('direction')} "
f"SL={t.get('stop_loss')} TP={t.get('take_profit')} "
f"补仓区[{t.get('add_lower')}~{t.get('add_upper')}] "
f"状态={t.get('status')}"
)
for o in hub_mon.get("orders") or []:
if not isinstance(o, dict):
continue
label = _monitor_label(o, "下单监控")
out["orders"].append(
f"{label}: {o.get('symbol')} {o.get('direction')} "
f"触发={o.get('trigger_price')} SL={o.get('stop_loss')} TP={o.get('take_profit')} "
f"状态={o.get('status')}"
)
for k in hub_mon.get("keys") or []:
if not isinstance(k, dict):
continue
out["keys"].append(
f"关键位: {k.get('symbol')} {k.get('direction')} "
f"上={k.get('upper')} 下={k.get('lower')} 类型={k.get('monitor_type')}"
)
for r in hub_mon.get("rolls") or []:
if not isinstance(r, dict):
continue
out["rolls"].append(
f"顺势加仓: {r.get('symbol')} {r.get('direction')} "
f"腿数={r.get('leg_count')} SL={r.get('current_stop_loss') or r.get('initial_stop_loss')} "
f"状态={r.get('status')}"
)
return out
def _fetch_account_bundle(client: httpx.Client, ex: dict, trading_day: str) -> dict[str, Any]:
name = ex.get("name") or ex.get("key") or ex.get("id")
key = ex.get("key") or ""
@@ -89,8 +199,15 @@ def _fetch_account_bundle(client: httpx.Client, ex: dict, trading_day: str) -> d
"trades": [],
"trade_stats": summarize_trades([]),
"positions": [],
"open_position_count": 0,
"float_pnl_u": 0.0,
"balance_usdt": None,
"funding_usdt": None,
"trading_usdt": None,
"available_trading_usdt": None,
"trades_yesterday": [],
"trade_stats_yesterday": summarize_trades([]),
"monitor_lines": {"trends": [], "orders": [], "keys": [], "rolls": []},
"issues": [],
"agent_ok": False,
"flask_ok": False,
@@ -122,13 +239,30 @@ def _fetch_account_bundle(client: httpx.Client, ex: dict, trading_day: str) -> d
base["balance_usdt"] = _safe_float(agent_body.get("balance_usdt"))
positions = agent_body.get("positions") or []
if isinstance(positions, list):
base["positions"] = positions
base["float_pnl_u"] = round(
sum(_position_float_pnl(p) for p in positions if isinstance(p, dict)), 4
)
open_positions = _filter_open_positions(positions)
base["positions"] = open_positions
base["open_position_count"] = len(open_positions)
base["float_pnl_u"] = round(sum(_position_float_pnl(p) for p in open_positions), 4)
hub_mon = None
prev_day = previous_trading_day(trading_day)
if flask_url:
try:
r = client.get(
f"{flask_url}/api/hub/account",
headers=_hub_headers(),
timeout=hub_flask_timeout(),
)
if r.status_code == 200:
acct_body = r.json()
if isinstance(acct_body, dict) and acct_body.get("ok"):
base["funding_usdt"] = _safe_float(acct_body.get("funding_usdt"))
base["trading_usdt"] = _safe_float(acct_body.get("trading_usdt"))
base["available_trading_usdt"] = _safe_float(acct_body.get("available_trading_usdt"))
base["flask_ok"] = True
except Exception as exc:
base["issues"].append(f"资金接口: {exc}")
try:
r = client.get(
f"{flask_url}/api/hub/trades/today",
@@ -145,6 +279,25 @@ def _fetch_account_bundle(client: httpx.Client, ex: dict, trading_day: str) -> d
except Exception as exc:
base["issues"].append(f"成交接口: {exc}")
if prev_day:
try:
r = client.get(
f"{flask_url}/api/hub/trades/today",
headers=_hub_headers(),
params={"trading_day": prev_day},
timeout=hub_flask_timeout(),
)
if r.status_code == 200:
y_body = r.json()
if isinstance(y_body, dict) and y_body.get("ok"):
base["trades_yesterday"] = y_body.get("trades") or []
base["trade_stats_yesterday"] = y_body.get("stats") or summarize_trades(
base["trades_yesterday"]
)
base["flask_ok"] = True
except Exception as exc:
base["issues"].append(f"昨日成交: {exc}")
try:
r = client.get(
f"{flask_url}/api/hub/monitor",
@@ -158,6 +311,7 @@ def _fetch_account_bundle(client: httpx.Client, ex: dict, trading_day: str) -> d
base["flask_ok"] = True
base["active_orders"] = len(hub_mon.get("orders") or [])
base["active_trends"] = len(hub_mon.get("trends") or [])
base["monitor_lines"] = _format_monitor_sections(hub_mon)
except Exception as exc:
if "成交接口" not in str(base["issues"]):
base["issues"].append(f"监控接口: {exc}")
@@ -198,6 +352,10 @@ def build_daily_context(
total_closed_pnl = 0.0
total_closed = total_win = total_loss = 0
total_float = 0.0
total_funding = 0.0
total_trading = 0.0
total_open_positions = 0
funding_known = trading_known = 0
for ac in accounts:
if ac.get("status") == "未监控":
continue
@@ -207,47 +365,128 @@ def build_daily_context(
total_win += int(st.get("win_count") or 0)
total_loss += int(st.get("loss_count") or 0)
total_float += float(ac.get("float_pnl_u") or 0)
total_open_positions += int(ac.get("open_position_count") or _account_open_position_count(ac))
fu = _safe_float(ac.get("funding_usdt"))
tu = _safe_float(ac.get("trading_usdt"))
if fu is not None:
total_funding += fu
funding_known += 1
if tu is not None:
total_trading += tu
trading_known += 1
if not funding_known:
total_funding = None
if not trading_known:
total_trading = None
totals = {
"trading_day": day,
"prev_trading_day": previous_trading_day(day),
"total_pnl_u": round(total_closed_pnl, 4),
"closed_count": total_closed,
"win_count": total_win,
"loss_count": total_loss,
"float_pnl_u": round(total_float, 4),
"open_position_count": total_open_positions,
"total_funding_usdt": round(total_funding, 4) if total_funding is not None else None,
"total_trading_usdt": round(total_trading, 4) if total_trading is not None else None,
}
record_fund_snapshot(day, accounts, keep_days=FUND_HISTORY_DAYS)
fund_history = get_fund_history(anchor_day=day, keep_days=FUND_HISTORY_DAYS)
account_names = {str(ac.get("key") or ac.get("id")): ac.get("name") for ac in accounts}
fund_history_text = format_fund_history_text(fund_history, account_names=account_names)
payload = {
"trading_day": day,
"prev_trading_day": previous_trading_day(day),
"totals": totals,
"accounts": accounts,
"fund_history": fund_history,
"fund_history_text": fund_history_text,
}
payload = {"trading_day": day, "totals": totals, "accounts": accounts}
text = format_context_text(payload)
digest = hashlib.sha256(text.encode("utf-8")).hexdigest()[:16]
return {"trading_day": day, "totals": totals, "accounts": accounts, "text": text, "context_hash": digest}
return {
"trading_day": day,
"prev_trading_day": previous_trading_day(day),
"totals": totals,
"accounts": accounts,
"fund_history": fund_history,
"fund_history_text": fund_history_text,
"text": text,
"context_hash": digest,
}
def format_context_text(payload: dict) -> str:
lines = []
totals = payload.get("totals") or {}
day = totals.get("trading_day")
prev_day = totals.get("prev_trading_day") or previous_trading_day(str(day or ""))
lines.append(
f"【合计】交易日 {totals.get('trading_day')} | "
f"平仓盈亏 {totals.get('total_pnl_u')}U | "
f"【合计·今日 {day}】平仓盈亏 {totals.get('total_pnl_u')}U | "
f"笔数 {totals.get('closed_count')}(胜{totals.get('win_count')}/负{totals.get('loss_count')}| "
f"监控户浮盈亏合计 {totals.get('float_pnl_u')}U"
f"实盘持仓 {totals.get('open_position_count', 0)} 仓 | "
f"浮盈亏 {totals.get('float_pnl_u')}U | "
f"资金账户合计 {_fmt_fund(totals.get('total_funding_usdt'))} | "
f"交易账户合计 {_fmt_fund(totals.get('total_trading_usdt'))}"
)
lines.append(
f"【对比交易日】昨日={prev_day},今日={day}"
"「持仓」= 交易所 Agent 实盘;「趋势/关键位/监控单/加仓」= 本地计划,不等于已开仓。"
)
fund_txt = str(payload.get("fund_history_text") or "").strip()
if fund_txt:
lines.append("")
lines.append(fund_txt)
lines.append("")
for ac in payload.get("accounts") or []:
st = ac.get("trade_stats") or {}
sty = ac.get("trade_stats_yesterday") or {}
lines.append(f"--- 账户:{ac.get('name')} ({ac.get('key')}) ---")
lines.append(f"状态:{ac.get('status')}")
if ac.get("status") == "未监控":
lines.append("")
continue
lines.append(
f"当日平仓:{st.get('closed_count')} 笔,盈亏 {st.get('total_pnl_u')}U "
f"资金账户 {_fmt_fund(ac.get('funding_usdt'))} | "
f"交易账户 {_fmt_fund(ac.get('trading_usdt'))} | "
f"可用 {_fmt_fund(ac.get('available_trading_usdt'))}"
)
lines.append(
f"今日({day})平仓:{st.get('closed_count')} 笔,盈亏 {st.get('total_pnl_u')}U "
f"(胜{st.get('win_count')}/负{st.get('loss_count')}"
)
lines.append(f"合约可用余额:{ac.get('balance_usdt') if ac.get('balance_usdt') is not None else '未知'} USDT")
lines.append(f"当前持仓浮盈亏:{ac.get('float_pnl_u')}U | 下单监控 {ac.get('active_orders')} | 趋势计划 {ac.get('active_trends')}")
lines.append(
f"昨日({prev_day})平仓:{sty.get('closed_count')} 笔,盈亏 {sty.get('total_pnl_u')}U "
f"(胜{sty.get('win_count')}/负{sty.get('loss_count')}"
)
open_n = int(ac.get("open_position_count") or _account_open_position_count(ac))
if open_n <= 0:
lines.append("当前交易所持仓:无(空仓)")
else:
lines.append(
f"当前交易所持仓:{open_n} 仓 | 浮盈亏合计 {ac.get('float_pnl_u')}U"
)
mon = ac.get("monitor_lines") or {}
if mon.get("trends"):
lines.append("趋势回调计划(本地,非持仓):")
for row in mon["trends"][:8]:
lines.append(f" - {row}")
if mon.get("rolls"):
lines.append("顺势加仓(本地,非持仓):")
for row in mon["rolls"][:8]:
lines.append(f" - {row}")
if mon.get("keys"):
lines.append("关键位监控(本地,非持仓):")
for row in mon["keys"][:8]:
lines.append(f" - {row}")
if mon.get("orders"):
lines.append("进行中的下单监控(本地,非持仓):")
for row in mon["orders"][:8]:
lines.append(f" - {row}")
positions = ac.get("positions") or []
if positions:
lines.append("持仓:")
lines.append("持仓明细(交易所实盘)")
for p in positions[:8]:
if not isinstance(p, dict):
continue
@@ -256,14 +495,21 @@ def format_context_text(payload: dict) -> str:
contracts = p.get("contracts") or p.get("size") or "?"
upnl = _position_float_pnl(p)
lines.append(f" - {sym} {side} 张数{contracts} 浮盈亏{upnl:.4f}U")
trades = ac.get("trades") or []
if trades:
lines.append("当日平仓明细:")
for t in trades[:15]:
lines.append(
f" - {t.get('symbol')} {t.get('direction')} {t.get('result')} "
f"{t.get('pnl_amount')}U @ {t.get('closed_at') or '?'}"
)
lines.append(
f"Agent合约余额:{ac.get('balance_usdt') if ac.get('balance_usdt') is not None else '未知'} USDT"
)
trades_today = ac.get("trades") or []
if trades_today:
lines.append(f"今日平仓明细:")
for t in trades_today[:15]:
lines.append(f" - {_format_trade_line(t)}")
trades_y = ac.get("trades_yesterday") or []
if trades_y:
lines.append(f"昨日平仓明细:")
for t in trades_y[:15]:
lines.append(f" - {_format_trade_line(t)}")
if not trades_today and not trades_y:
lines.append("平仓明细:无")
issues = ac.get("issues") or []
if issues:
lines.append("关注点:" + "".join(issues))
@@ -272,29 +518,99 @@ def format_context_text(payload: dict) -> str:
def format_account_remark(ac: dict) -> str:
"""分户表格备注:持仓摘要或关注点"""
"""分户表格备注:监控摘要 + 持仓。"""
parts: list[str] = []
mon = ac.get("monitor_lines") or {}
if mon.get("trends"):
parts.append(f"趋势{len(mon['trends'])}")
if mon.get("rolls"):
parts.append(f"加仓{len(mon['rolls'])}")
if mon.get("keys"):
parts.append(f"关键位{len(mon['keys'])}")
if mon.get("orders"):
parts.append(f"监控单{len(mon['orders'])}")
positions = ac.get("positions") or []
if not positions:
if positions:
for p in positions[:2]:
if not isinstance(p, dict):
continue
sym = p.get("symbol") or "?"
side = p.get("side") or "?"
upnl = _position_float_pnl(p)
parts.append(f"{sym} {side}{upnl:.2f}U")
if len(positions) > 2:
parts.append(f"+{len(positions) - 2}")
if not parts:
issues = ac.get("issues") or []
if issues:
return "".join(str(x) for x in issues[:2])
return ""
parts: list[str] = []
for p in positions[:3]:
if not isinstance(p, dict):
return "".join(parts)
def collect_closed_trades_snapshot(accounts: list[dict], *, today: str, yesterday: str) -> list[dict]:
rows: list[dict] = []
for ac in accounts or []:
name = ac.get("name") or ac.get("key")
for t in ac.get("trades_yesterday") or []:
if not isinstance(t, dict):
continue
rows.append({**t, "account_name": name, "trading_day": yesterday})
for t in ac.get("trades") or []:
if not isinstance(t, dict):
continue
rows.append({**t, "account_name": name, "trading_day": today})
rows.sort(key=lambda x: str(x.get("closed_at") or x.get("opened_at") or ""), reverse=True)
return rows[:80]
def format_chat_position_overview(payload: dict) -> str:
totals = payload.get("totals") or {}
total_open = int(totals.get("open_position_count") or 0)
if total_open <= 0:
head = f"【实盘持仓总览】当前空仓(监控户合计 0 仓)。浮盈亏 0U 表示无持仓,不是「有仓但不动」。"
else:
head = (
f"【实盘持仓总览】监控户合计 {total_open} 仓,"
f"浮盈亏合计 {totals.get('float_pnl_u')}U。"
)
lines = [
head,
"【区分】只有带「持仓明细/交易所实盘」字样的才是已开仓;趋势回调、关键位、下单监控、顺势加仓是本地计划/监控,不算持仓。",
]
for ac in payload.get("accounts") or []:
if ac.get("status") == "未监控":
continue
sym = p.get("symbol") or "?"
side = p.get("side") or "?"
contracts = p.get("contracts") if p.get("contracts") is not None else p.get("size")
upnl = _position_float_pnl(p)
parts.append(f"持仓: {sym} {side} 张数{contracts} 浮盈亏{upnl:.4f}U")
if len(positions) > 3:
parts.append(f"另有{len(positions) - 3}")
return "".join(parts) if parts else ""
n = int(ac.get("open_position_count") or _account_open_position_count(ac))
mc = _monitor_counts(ac)
mon_parts = []
if mc["trends"]:
mon_parts.append(f"趋势{mc['trends']}")
if mc["rolls"]:
mon_parts.append(f"加仓{mc['rolls']}")
if mc["keys"]:
mon_parts.append(f"关键位{mc['keys']}")
if mc["orders"]:
mon_parts.append(f"监控单{mc['orders']}")
mon_txt = f";本地监控 {' '.join(mon_parts)}" if mon_parts else ""
if n <= 0:
lines.append(f"- {ac.get('name')}:空仓{mon_txt}")
else:
lines.append(
f"- {ac.get('name')}{n}仓 浮盈亏{ac.get('float_pnl_u')}U{mon_txt}"
)
return "\n".join(lines)
def format_chat_context_brief(payload: dict, max_chars: int = 2500) -> str:
text = format_context_text(payload)
def format_chat_context_for_chat(payload: dict, max_chars: int = 5200) -> str:
overview = format_chat_position_overview(payload)
body = format_context_text(payload)
text = overview + "\n\n" + body
if len(text) <= max_chars:
return text
return text[: max_chars - 3].rstrip() + "..."
budget = max(800, max_chars - len(overview) - 4)
return overview + "\n\n" + body[:budget].rstrip() + "..."
def format_chat_context_brief(payload: dict, max_chars: int = 4500) -> str:
return format_chat_context_for_chat(payload, max_chars=max_chars)
+109
View File
@@ -0,0 +1,109 @@
"""中控 AI:分户资金快照(保留 15 天,供总结/聊天上下文)。"""
from __future__ import annotations
import json
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Optional
from hub_ai.config import FUND_HISTORY_DAYS
HUB_DIR = Path(__file__).resolve().parent.parent
FUND_HISTORY_PATH = HUB_DIR / "hub_ai_fund_history.json"
def _now_str() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def _atomic_write(path: Path, data: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
os.replace(tmp, path)
def _load_store() -> dict:
if not FUND_HISTORY_PATH.is_file():
return {"version": 1, "days": {}}
try:
loaded = json.loads(FUND_HISTORY_PATH.read_text(encoding="utf-8"))
if isinstance(loaded, dict):
loaded.setdefault("version", 1)
loaded.setdefault("days", {})
return loaded
except Exception:
pass
return {"version": 1, "days": {}}
def _prune_days(days: dict, *, keep_days: int, anchor_day: str) -> dict:
try:
anchor = datetime.strptime(anchor_day[:10], "%Y-%m-%d")
except ValueError:
anchor = datetime.now()
cutoff = (anchor - timedelta(days=max(1, keep_days) - 1)).strftime("%Y-%m-%d")
out = {k: v for k, v in (days or {}).items() if str(k) >= cutoff}
return out
def record_fund_snapshot(
trading_day: str,
accounts: list[dict],
*,
keep_days: int = FUND_HISTORY_DAYS,
) -> dict[str, Any]:
"""写入当日各户资金账户/交易账户余额,并裁剪历史。"""
day = (trading_day or "").strip()[:10]
if not day:
return {}
store = _load_store()
days = dict(store.get("days") or {})
row_accounts: dict[str, dict] = {}
for ac in accounts or []:
key = str(ac.get("key") or ac.get("id") or "")
if not key:
continue
row_accounts[key] = {
"name": ac.get("name"),
"funding_usdt": ac.get("funding_usdt"),
"trading_usdt": ac.get("trading_usdt"),
"recorded_at": _now_str(),
}
days[day] = {"accounts": row_accounts, "updated_at": _now_str()}
days = _prune_days(days, keep_days=keep_days, anchor_day=day)
_atomic_write(FUND_HISTORY_PATH, {"version": 1, "days": days})
return days
def get_fund_history(*, anchor_day: str, keep_days: int = FUND_HISTORY_DAYS) -> dict[str, dict]:
store = _load_store()
days = _prune_days(
dict(store.get("days") or {}),
keep_days=keep_days,
anchor_day=anchor_day,
)
return days
def format_fund_history_text(history: dict[str, dict], *, account_names: Optional[dict[str, str]] = None) -> str:
if not history:
return "(暂无资金历史快照)"
names = account_names or {}
lines = ["【近15日资金快照(资金账户 / 交易账户 USDT)】"]
for day in sorted(history.keys()):
block = history.get(day) or {}
ac_map = block.get("accounts") or {}
if not ac_map:
continue
parts = []
for key, ac in ac_map.items():
label = names.get(key) or ac.get("name") or key
fu = ac.get("funding_usdt")
tu = ac.get("trading_usdt")
fu_txt = f"{fu}U" if fu is not None else "未知"
tu_txt = f"{tu}U" if tu is not None else "未知"
parts.append(f"{label}: 资金{fu_txt} / 交易{tu_txt}")
lines.append(f"- {day}: " + "".join(parts))
return "\n".join(lines) if len(lines) > 1 else "(暂无资金历史快照)"
+28 -12
View File
@@ -5,27 +5,34 @@ SUMMARY_SYSTEM = """
硬性规则:
- 只能陈述数据中明确出现的数字与事实;禁止编造成交、止损、扛单、行情预测。
- 上下文含「昨日+今日」两个交易日的平仓与「近15日资金快照」;须连贯引用,不得只写单日而忽略另一日。
- 未监控的账户必须标注「未监控」,不得臆测其盈亏。
- 连接失败或数据缺失的账户如实写明,不要猜测。
- 不要用安慰、说教、建议口吻(那些属于聊天功能)
- 趋势回调计划、顺势加仓、关键位监控、进行中的下单监控:仅据数据列示,无则写「无」
- 第1~4节保持客观台账;**第5节操作建议**可基于资金账户/交易账户余额、15日资金变化、仓位与监控单,给出简短、可执行的资金与仓位安排建议(仍禁止预测涨跌、保证收益)。
- 禁止夸张词(致命、崩溃、灾难等)。
输出格式(Markdown,标题必须一致):
**今日交易总结({trading_day}**
**1. 总览**
- **合计盈亏(U)**:…
- **平仓笔数**:…(胜 / 负 / 平)
- **当前持仓浮盈亏(U)**:…(仅汇总已监控且有数据的账户)
- **对比说明**:昨日 vs 今日(交易日日期见数据)
- **合计盈亏(U)**:今日平仓合计 …
- **平仓笔数**:今日 …(胜 / 负 / 平);昨日 …
- **当前持仓浮盈亏(U)**:…
- **资金合计**:资金账户 … / 交易账户 …(仅已监控且有数据账户)
**2. 分户明细**
中控页面会自动渲染分户表格,本节不要输出 pipe 分隔行或 Markdown 表格;可写一句「见下表」或直接留空。
**3. 需关注**
仅有依据时列出(如:某户当日亏损最大、浮亏偏大、Flask/Agent 异常、有持仓但无本地监控等);若无则写「无」。
仅有依据时列出(亏损、浮亏、监控/趋势/关键位异常、资金缺口等);若无则写「无」。
**4. 数据说明**
列出数据缺口(某户未启用、接口失败等)。
列出数据缺口(某户未启用、接口失败、缺15日资金快照等)。
**5. 操作建议**
基于各户资金账户与交易账户余额、近15日资金走势、持仓与监控单,给出 2~5 条简短建议(如:是否需要从资金账户补充交易账户、哪户风险敞口偏高等)。无依据则写「暂无」。
""".strip()
@@ -38,16 +45,19 @@ CHAT_SYSTEM = """
- 不要「第1点第2点你应该…」;不要「作为你的教练我必须…」。
- 不预测涨跌,不保证收益,不替用户做决定。
- 只能依据提供的监控与交易数据说话;看不到的就说「我这边看不到,你可以去 xx 实例页确认」。
若附带「今日总结摘要」,可自然引用,但保持口语,不要复读整份报告
- **持仓判定**:只有快照里「实盘持仓总览 / 持仓明细 / 交易所实盘」才算已开仓;「空仓 / 0 仓」就是没仓位。浮盈亏 0U 且空仓时,不要说「还有仓」「卡着不动」。
- **监控单 ≠ 持仓**:趋势回调、关键位、顺势加仓、下单监控是本地计划或挂单监控,用户说已平仓时,即使还有这些监控,也不要当成手里还有仓
- 用户口述与快照冲突时,以快照为准并口语说明「我这边看到是空仓/有N仓」。
- 若附带「今日总结摘要」,那是较早生成的缓存,**实盘持仓以【当前多账户快照】里的「实盘持仓总览」为准**,摘要里若提到持仓可能已过时。
- 若用户上传图片,可结合图中可见信息讨论,看不清的明确说看不清。
""".strip()
def build_summary_user_prompt(context_text: str, trading_day: str) -> str:
return f"""
交易日:{trading_day}
交易日(今日){trading_day}
以下为中控聚合的多账户数据(含未监控账户标记):
以下为中控聚合的多账户数据(含昨日+今日平仓、近15日资金快照、趋势回调/顺势加仓/关键位/监控单):
{context_text}
""".strip()
@@ -60,15 +70,21 @@ def build_chat_user_prompt(
summary_excerpt: str,
history_lines: str,
user_message: str,
attachment_note: str = "",
) -> str:
parts = [
f"【交易日】{trading_day}",
"【当前多账户快照】",
"【当前多账户快照(含实盘持仓与本地监控,发送时已刷新)",
context_text.strip() or "(无监控数据)",
]
if summary_excerpt.strip():
parts.extend(["【今日总结摘要(供参考)】", summary_excerpt.strip()])
parts.extend([
"【今日总结摘要(可能滞后,持仓以快照「实盘持仓总览」为准)】",
summary_excerpt.strip(),
])
if history_lines.strip():
parts.extend(["【此前对话】", history_lines.strip()])
if attachment_note.strip():
parts.extend(["【用户附件说明】", attachment_note.strip()])
parts.extend(["【用户现在说】", user_message.strip()])
return "\n\n".join(parts)
+21 -4
View File
@@ -3,7 +3,7 @@ from __future__ import annotations
from typing import Callable
from fastapi import APIRouter, HTTPException
from fastapi import APIRouter, File, Form, HTTPException, UploadFile
from pydantic import BaseModel, Field
from hub_ai.chat import get_chat_state, send_chat_message, start_new_chat
@@ -94,12 +94,29 @@ def create_hub_ai_router(*, load_all_exchanges: Callable[[], list]) -> APIRouter
return start_new_chat(trading_day=day)
@router.post("/chat/send")
def api_ai_chat_send(body: ChatSendBody):
async def api_ai_chat_send(
message: str = Form(""),
trading_day: str = Form(""),
files: list[UploadFile] = File(default=[]),
):
exchanges = load_all_exchanges()
raw_attachments = []
for f in files or []:
if not f or not f.filename:
continue
data = await f.read()
raw_attachments.append(
{
"filename": f.filename,
"content_type": f.content_type or "",
"data": data,
}
)
result = send_chat_message(
exchanges,
body.message,
trading_day=_day(body.trading_day) if body.trading_day.strip() else None,
message,
trading_day=_day(trading_day) if trading_day.strip() else None,
raw_attachments=raw_attachments,
)
if not result.get("ok"):
raise HTTPException(status_code=502, detail=result.get("msg") or "发送失败")
+9 -1
View File
@@ -163,7 +163,13 @@ def ensure_active_session(*, trading_day: str) -> dict:
return create_new_session(trading_day=trading_day)
def append_chat_message(session_id: str, role: str, content: str) -> dict:
def append_chat_message(
session_id: str,
role: str,
content: str,
*,
attachments: Optional[list] = None,
) -> dict:
store = load_chat_store()
sessions = store.get("sessions") or []
target = None
@@ -174,6 +180,8 @@ def append_chat_message(session_id: str, role: str, content: str) -> dict:
if not target:
raise KeyError("session_not_found")
msg = {"role": role, "content": content.strip(), "at": _now_str()}
if attachments:
msg["attachments"] = list(attachments)
target.setdefault("messages", []).append(msg)
target["updated_at"] = _now_str()
if role == "user" and (target.get("title") in (None, "", "新对话")):
+37 -17
View File
@@ -4,11 +4,46 @@ from __future__ import annotations
from typing import Any
from hub_ai.client import generate_text, model_label
from hub_ai.context import build_daily_context, format_account_remark
from hub_ai.context import (
build_daily_context,
collect_closed_trades_snapshot,
format_account_remark,
)
from hub_ai.prompts import SUMMARY_SYSTEM, build_summary_user_prompt
from hub_ai.store import append_summary, get_latest_summary, list_summaries
def _stats_snapshot_from_ctx(ctx: dict) -> dict:
day = ctx.get("trading_day")
prev = ctx.get("prev_trading_day")
accounts = ctx.get("accounts") or []
return {
"totals": ctx.get("totals"),
"prev_trading_day": prev,
"fund_history": ctx.get("fund_history"),
"closed_trades": collect_closed_trades_snapshot(accounts, today=day, yesterday=prev),
"by_account": {
str(ac.get("key") or ac.get("id")): {
"key": ac.get("key"),
"name": ac.get("name"),
"status": ac.get("status"),
"funding_usdt": ac.get("funding_usdt"),
"trading_usdt": ac.get("trading_usdt"),
"available_trading_usdt": ac.get("available_trading_usdt"),
"pnl_u": (ac.get("trade_stats") or {}).get("total_pnl_u"),
"pnl_u_yesterday": (ac.get("trade_stats_yesterday") or {}).get("total_pnl_u"),
"closed_count": (ac.get("trade_stats") or {}).get("closed_count"),
"closed_count_yesterday": (ac.get("trade_stats_yesterday") or {}).get("closed_count"),
"float_pnl_u": ac.get("float_pnl_u"),
"remark": format_account_remark(ac),
"monitor_lines": ac.get("monitor_lines") or {},
"issues": ac.get("issues") or [],
}
for ac in accounts
},
}
def generate_daily_summary(
exchanges: list[dict],
*,
@@ -34,22 +69,7 @@ def generate_daily_summary(
if content.startswith("AI 调用失败"):
return {"ok": False, "msg": content, "trading_day": day}
stats_snapshot = {
"totals": ctx.get("totals"),
"by_account": {
str(ac.get("key") or ac.get("id")): {
"key": ac.get("key"),
"name": ac.get("name"),
"status": ac.get("status"),
"pnl_u": (ac.get("trade_stats") or {}).get("total_pnl_u"),
"closed_count": (ac.get("trade_stats") or {}).get("closed_count"),
"float_pnl_u": ac.get("float_pnl_u"),
"remark": format_account_remark(ac),
"issues": ac.get("issues") or [],
}
for ac in ctx.get("accounts") or []
},
}
stats_snapshot = _stats_snapshot_from_ctx(ctx)
row = append_summary(
trading_day=day,
content_md=content,