Add AI trading supervisor with WeChat push and daily session

Proactive monitoring for manual/hub closes and new opens prevents overtrading via in-app alerts, configurable WeChat links, and supervisor chat.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-23 19:25:01 +08:00
parent d3d366d0ee
commit bfbd6879d6
15 changed files with 1699 additions and 43 deletions
+632
View File
@@ -0,0 +1,632 @@
"""交易监管:事件分类、频率规则、会话消息与企业微信推送。"""
from __future__ import annotations
import json
import os
import threading
import uuid
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Callable, Optional
from hub_trades_lib import current_trading_day, parse_dt_for_trading_day
HUB_DIR = Path(__file__).resolve().parent
STATE_PATH = HUB_DIR / "hub_supervisor_state.json"
PROGRAM_RESULTS = frozenset({"止盈", "止损", "保本止盈", "移动止盈"})
MANUAL_CLOSE_RESULTS = frozenset({"手动平仓"})
HUB_CLOSE_RESULTS = frozenset({"强制清仓"})
WEAK_RESULTS = frozenset({"外部平仓", "时间平仓"})
EVENT_OPEN = "open"
EVENT_MANUAL_CLOSE = "manual_close"
EVENT_HUB_CLOSE = "hub_close"
EVENT_PROGRAM_TP = "program_tp"
EVENT_PROGRAM_SL = "program_sl"
EVENT_EXTERNAL = "external"
EVENT_FREQ_WARN = "freq_warn"
DEFAULT_SUPERVISOR = {
"enabled": True,
"wechat_webhook": "",
"wechat_link_base": "http://127.0.0.1:5100/ai?mode=supervisor",
"wechat_prefix": "【交易监管】",
"wechat_on_program_tp_sl": True,
"manual_close_daily_warn": 2,
"interval_warn_minutes": 15,
"freq_30m_count": 2,
"reopen_after_close_minutes": 30,
}
def _now_str() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def _atomic_write(path: Path, data: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
os.replace(tmp, path)
def _load_json(path: Path, default: dict) -> dict:
if not path.is_file():
return dict(default)
try:
loaded = json.loads(path.read_text(encoding="utf-8"))
if isinstance(loaded, dict):
return loaded
except Exception:
pass
return dict(default)
def normalize_supervisor_settings(raw: dict | None) -> dict:
out = dict(DEFAULT_SUPERVISOR)
env_webhook = (os.getenv("SUPERVISOR_WECHAT_WEBHOOK") or "").strip()
env_link = (os.getenv("SUPERVISOR_WECHAT_LINK") or "").strip()
if env_webhook:
out["wechat_webhook"] = env_webhook
if env_link:
out["wechat_link_base"] = env_link
if not isinstance(raw, dict):
return out
for key in DEFAULT_SUPERVISOR:
if key not in raw:
continue
val = raw.get(key)
if key == "enabled" or key == "wechat_on_program_tp_sl":
out[key] = bool(val)
elif key in ("manual_close_daily_warn", "freq_30m_count"):
try:
out[key] = max(1, int(val))
except (TypeError, ValueError):
pass
elif key in ("interval_warn_minutes", "reopen_after_close_minutes"):
try:
out[key] = max(1, int(val))
except (TypeError, ValueError):
pass
elif isinstance(val, str):
out[key] = val.strip()
return out
def load_supervisor_state() -> dict:
data = _load_json(STATE_PATH, {"version": 1, "trading_day": "", "processed": [], "positions": {}, "stats": {}})
data.setdefault("version", 1)
data.setdefault("processed", [])
data.setdefault("positions", {})
data.setdefault("stats", {})
return data
def save_supervisor_state(data: dict) -> None:
processed = list(data.get("processed") or [])
if len(processed) > 500:
processed = processed[-500:]
data["processed"] = processed
_atomic_write(STATE_PATH, data)
def _trade_event_id(trade: dict) -> str:
return "|".join(
[
str(trade.get("account_name") or trade.get("account_key") or ""),
str(trade.get("symbol") or ""),
str(trade.get("closed_at") or ""),
str(trade.get("result") or ""),
str(trade.get("pnl_amount") or ""),
]
)
def classify_close_result(result: str) -> str:
r = (result or "").strip()
if r in PROGRAM_RESULTS:
if r == "止损":
return EVENT_PROGRAM_SL
return EVENT_PROGRAM_TP
if r in MANUAL_CLOSE_RESULTS:
return EVENT_MANUAL_CLOSE
if r in HUB_CLOSE_RESULTS:
return EVENT_HUB_CLOSE
if r in WEAK_RESULTS:
return EVENT_EXTERNAL
return EVENT_EXTERNAL
def is_supervised_event(event_type: str) -> bool:
return event_type in (EVENT_OPEN, EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE)
def is_program_event(event_type: str) -> bool:
return event_type in (EVENT_PROGRAM_TP, EVENT_PROGRAM_SL)
def _position_contracts(pos: dict) -> float:
for key in ("contracts", "contracts_signed", "size"):
try:
v = pos.get(key)
if v is not None and v != "":
return abs(float(v))
except (TypeError, ValueError):
continue
return 0.0
def collect_position_keys(board_payload: dict | None) -> dict[str, dict]:
out: dict[str, dict] = {}
rows = (board_payload or {}).get("rows") or []
for row in rows:
if not isinstance(row, dict):
continue
ex_id = str(row.get("id") or row.get("key") or "")
ex_name = str(row.get("name") or row.get("key") or ex_id)
ag = row.get("agent") or {}
for p in ag.get("positions") or []:
if not isinstance(p, dict):
continue
if _position_contracts(p) < 1e-12:
continue
sym = str(p.get("symbol") or "")
side = str(p.get("side") or "").lower() or "long"
key = f"{ex_id}|{sym}|{side}"
out[key] = {
"exchange_id": ex_id,
"exchange_name": ex_name,
"symbol": sym,
"side": side,
"contracts": _position_contracts(p),
}
return out
def detect_new_opens(
prev_positions: dict[str, dict],
curr_positions: dict[str, dict],
) -> list[dict]:
events = []
for key, info in curr_positions.items():
if key in prev_positions:
continue
events.append({"event_type": EVENT_OPEN, "event_id": f"open:{key}:{_now_str()[:16]}", **info})
return events
def detect_new_closes(
prev_processed: set[str],
closed_trades: list[dict],
) -> list[dict]:
events = []
for trade in closed_trades or []:
if not isinstance(trade, dict):
continue
eid = _trade_event_id(trade)
if eid in prev_processed:
continue
event_type = classify_close_result(str(trade.get("result") or ""))
events.append(
{
"event_type": event_type,
"event_id": f"close:{eid}",
"account_name": trade.get("account_name"),
"symbol": trade.get("symbol"),
"direction": trade.get("direction"),
"result": trade.get("result"),
"pnl_amount": trade.get("pnl_amount"),
"closed_at": trade.get("closed_at"),
}
)
return events
def _parse_event_dt(raw: Any) -> Optional[datetime]:
return parse_dt_for_trading_day(raw)
def _supervised_close_times(stats: dict, trading_day: str) -> list[datetime]:
rows = (stats.get(trading_day) or {}).get("supervised_closes") or []
out = []
for item in rows:
if isinstance(item, dict):
dt = _parse_event_dt(item.get("closed_at") or item.get("at"))
else:
dt = _parse_event_dt(item)
if dt:
out.append(dt)
out.sort()
return out
def _record_supervised_event(stats: dict, trading_day: str, event: dict) -> None:
day_stats = stats.setdefault(trading_day, {})
et = str(event.get("event_type") or "")
if et == EVENT_OPEN:
opens = list(day_stats.get("supervised_opens") or [])
opens.append({"at": _now_str(), "symbol": event.get("symbol")})
day_stats["supervised_opens"] = opens[-50:]
return
if et not in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE):
return
closes = list(day_stats.get("supervised_closes") or [])
closes.append(
{
"at": _now_str(),
"closed_at": event.get("closed_at"),
"event_type": et,
"pnl_amount": event.get("pnl_amount"),
}
)
day_stats["supervised_closes"] = closes[-50:]
def evaluate_frequency_warnings(
*,
trading_day: str,
event: dict,
stats: dict,
settings: dict,
) -> list[dict]:
if not is_supervised_event(str(event.get("event_type") or "")):
return []
warnings: list[dict] = []
day_stats = stats.setdefault(trading_day, {})
closes = _supervised_close_times(stats, trading_day)
now = datetime.now()
if event.get("event_type") in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE):
evt_dt = _parse_event_dt(event.get("closed_at")) or now
closes = closes + [evt_dt]
closes.sort()
open_count = len(day_stats.get("supervised_opens") or [])
close_count = len(day_stats.get("supervised_closes") or [])
if event.get("event_type") == EVENT_OPEN:
open_count += 1
elif event.get("event_type") in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE):
close_count += 1
interval_min = int(settings.get("interval_warn_minutes") or 15)
daily_warn = int(settings.get("manual_close_daily_warn") or 2)
freq_30m = int(settings.get("freq_30m_count") or 2)
reopen_min = int(settings.get("reopen_after_close_minutes") or 30)
if event.get("event_type") in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE) and len(closes) >= 2:
prev = closes[-2]
cur = closes[-1]
gap = (cur - prev).total_seconds() / 60.0
if gap < interval_min:
warnings.append(
{
"rule": "INTERVAL_SHORT",
"message": f"两笔手动/中控平间隔仅 {int(gap)} 分钟(阈值 {interval_min} 分钟)",
}
)
recent_closes = [t for t in closes if (now - t).total_seconds() <= 30 * 60]
if event.get("event_type") in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE) and len(recent_closes) >= freq_30m:
warnings.append(
{
"rule": "FREQ_30M",
"message": f"30 分钟内手动/中控平已达 {len(recent_closes)} 笔(阈值 {freq_30m} 笔)",
}
)
supervised_total = open_count + close_count
if supervised_total >= daily_warn and event.get("event_type") in (
EVENT_MANUAL_CLOSE,
EVENT_HUB_CLOSE,
EVENT_OPEN,
):
if close_count >= daily_warn:
warnings.append(
{
"rule": "DAILY_COUNT",
"message": f"今日手动/中控平 {close_count} 笔(阈值 {daily_warn} 笔),注意过度交易",
}
)
if event.get("event_type") == EVENT_OPEN and closes:
last_close = closes[-1]
gap_open = (now - last_close).total_seconds() / 60.0
if gap_open < reopen_min:
warnings.append(
{
"rule": "REOPEN_FAST",
"message": f"距上一笔手动/中控平仅 {int(gap_open)} 分钟又新开仓(阈值 {reopen_min} 分钟)",
}
)
loss_streak = 0
for item in reversed((stats.get(trading_day) or {}).get("supervised_closes") or []):
try:
pnl = float((item or {}).get("pnl_amount") or 0)
except (TypeError, ValueError):
pnl = 0.0
if pnl < 0:
loss_streak += 1
else:
break
if event.get("event_type") in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE):
try:
pnl = float(event.get("pnl_amount") or 0)
except (TypeError, ValueError):
pnl = 0.0
if pnl < 0:
loss_streak += 1
else:
loss_streak = 0
if loss_streak >= 2 and event.get("event_type") in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE):
warnings.append(
{
"rule": "LOSS_STREAK",
"message": f"连续 {loss_streak} 笔手动/中控亏损,先停一停",
}
)
deduped = []
seen = set()
for w in warnings:
key = w.get("rule")
if key in seen:
continue
seen.add(key)
deduped.append(w)
return deduped
def event_tag(event_type: str) -> str:
return {
EVENT_OPEN: "监管·开仓",
EVENT_MANUAL_CLOSE: "监管·手动平",
EVENT_HUB_CLOSE: "监管·中控平",
EVENT_PROGRAM_TP: "监管·程序止盈",
EVENT_PROGRAM_SL: "监管·程序止损",
EVENT_EXTERNAL: "监管·外部平",
EVENT_FREQ_WARN: "监管·频率",
}.get(event_type, "监管")
def build_system_message(event: dict, *, trading_day: str, warnings: list[dict] | None = None) -> str:
tag = event_tag(str(event.get("event_type") or ""))
ex = event.get("exchange_name") or event.get("account_name") or ""
sym = event.get("symbol") or ""
lines = [f"[{tag}] {ex} · {sym}"]
et = event.get("event_type")
if et == EVENT_OPEN:
side = event.get("side") or event.get("direction") or ""
if side:
lines.append(f"方向:{side}")
elif et in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE, EVENT_PROGRAM_TP, EVENT_PROGRAM_SL, EVENT_EXTERNAL):
res = event.get("result") or ""
pnl = event.get("pnl_amount")
if pnl is not None:
lines.append(f"结果 {res} · 盈亏 {pnl}U")
else:
lines.append(f"结果 {res}")
if event.get("closed_at"):
lines.append(f"平仓时间 {event.get('closed_at')}")
for w in warnings or []:
lines.append(f"{w.get('message')}")
lines.append(f"交易日 {trading_day}")
return "\n".join(lines)
def build_wechat_body(
event: dict,
*,
trading_day: str,
link_base: str,
system_text: str,
) -> str:
link = (link_base or "").strip()
if link:
sep = "&" if "?" in link else "?"
link = f"{link}{sep}day={trading_day}"
body = system_text.replace("\n", "\n")
if link:
body += f"\n详情:{link}"
return body
def should_send_wechat(event: dict, settings: dict) -> bool:
if not settings.get("enabled", True):
return False
webhook = (settings.get("wechat_webhook") or "").strip()
if not webhook or "replace-me" in webhook.lower():
return False
et = str(event.get("event_type") or "")
if is_program_event(et):
return bool(settings.get("wechat_on_program_tp_sl", True))
if et == EVENT_EXTERNAL:
return False
return True
def send_supervisor_wechat(
event: dict,
*,
trading_day: str,
settings: dict,
system_text: str,
) -> bool:
if not should_send_wechat(event, settings):
return False
from wechat_notify_lib import send_wechat_webhook
prefix = (settings.get("wechat_prefix") or "【交易监管】").strip()
body = build_wechat_body(
event,
trading_day=trading_day,
link_base=str(settings.get("wechat_link_base") or ""),
system_text=system_text,
)
return bool(
send_wechat_webhook(
str(settings.get("wechat_webhook") or ""),
body,
prefix=prefix,
)
)
_notify_hook: Optional[Callable[[], None]] = None
def set_supervisor_notify_hook(fn: Optional[Callable[[], None]]) -> None:
global _notify_hook
_notify_hook = fn
def _fire_notify() -> None:
if _notify_hook:
try:
_notify_hook()
except Exception:
pass
def process_supervisor_tick(
dashboard_payload: dict | None,
board_payload: dict | None,
settings_root: dict | None,
*,
reset_hour: int = 8,
ai_reply_fn: Optional[Callable[..., str]] = None,
) -> dict[str, Any]:
"""单次监管扫描:对比快照、写会话、推微信、可选 AI 评语。"""
from hub_ai.supervisor_store import (
append_supervisor_ai_message,
append_supervisor_system_message,
ensure_supervisor_session,
)
sup_cfg = normalize_supervisor_settings((settings_root or {}).get("supervisor"))
if not sup_cfg.get("enabled", True):
return {"ok": True, "skipped": True, "reason": "disabled"}
dash = dashboard_payload or {}
trading_day = str(dash.get("trading_day") or current_trading_day(reset_hour=reset_hour))
state = load_supervisor_state()
if str(state.get("trading_day") or "") != trading_day:
state = {
"version": 1,
"trading_day": trading_day,
"processed": [],
"positions": {},
"stats": {trading_day: state.get("stats", {}).get(trading_day, {})},
}
processed = set(str(x) for x in (state.get("processed") or []))
stats = dict(state.get("stats") or {})
prev_positions = dict(state.get("positions") or {})
curr_positions = collect_position_keys(board_payload)
closed_trades = dash.get("closed_trades") or []
if not state.get("initialized"):
for trade in closed_trades:
if isinstance(trade, dict):
processed.add(f"close:{_trade_event_id(trade)}")
state["trading_day"] = trading_day
state["processed"] = list(processed)
state["positions"] = curr_positions
state["initialized"] = True
save_supervisor_state(state)
return {"ok": True, "events": 0, "seeded": True, "trading_day": trading_day}
raw_events = detect_new_opens(prev_positions, curr_positions) + detect_new_closes(
processed, closed_trades
)
if not raw_events:
state["positions"] = curr_positions
save_supervisor_state(state)
return {"ok": True, "events": 0}
session = ensure_supervisor_session(trading_day)
session_id = str(session.get("id") or "")
handled = 0
for event in raw_events:
eid = str(event.get("event_id") or uuid.uuid4().hex)
if eid in processed:
continue
et = str(event.get("event_type") or "")
if et == EVENT_EXTERNAL:
processed.add(eid)
continue
warnings = evaluate_frequency_warnings(
trading_day=trading_day,
event=event,
stats=stats,
settings=sup_cfg,
)
if is_supervised_event(et):
_record_supervised_event(stats, trading_day, event)
system_text = build_system_message(event, trading_day=trading_day, warnings=warnings)
append_supervisor_system_message(
session_id,
system_text,
event_type=et,
level="warn" if warnings else "info",
)
send_supervisor_wechat(
event,
trading_day=trading_day,
settings=sup_cfg,
system_text=system_text,
)
for w in warnings:
warn_event = {
"event_type": EVENT_FREQ_WARN,
"event_id": f"warn:{eid}:{w.get('rule')}",
**event,
"warn_message": w.get("message"),
}
warn_text = f"[{event_tag(EVENT_FREQ_WARN)}] {w.get('message')}"
append_supervisor_system_message(
session_id,
warn_text,
event_type=EVENT_FREQ_WARN,
level="warn",
)
send_supervisor_wechat(
warn_event,
trading_day=trading_day,
settings=sup_cfg,
system_text=warn_text,
)
if ai_reply_fn and et != EVENT_EXTERNAL:
evt_snapshot = dict(event)
evt_warnings = list(warnings)
def _ai_bg() -> None:
try:
reply = ai_reply_fn(
event=evt_snapshot,
warnings=evt_warnings,
trading_day=trading_day,
session_id=session_id,
)
if reply and reply.strip():
append_supervisor_ai_message(session_id, reply.strip())
_fire_notify()
except Exception:
pass
threading.Thread(target=_ai_bg, daemon=True).start()
processed.add(eid)
handled += 1
state["trading_day"] = trading_day
state["processed"] = list(processed)
state["positions"] = curr_positions
state["stats"] = stats
save_supervisor_state(state)
if handled:
_fire_notify()
return {"ok": True, "events": handled, "trading_day": trading_day, "session_id": session_id}