54ba412d1d
Co-authored-by: Cursor <cursoragent@cursor.com>
758 lines
25 KiB
Python
758 lines
25 KiB
Python
"""交易监管:事件分类、频率规则、会话消息与企业微信推送。"""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import os
|
||
import threading
|
||
import uuid
|
||
from datetime import datetime, timedelta
|
||
from pathlib import Path
|
||
from typing import Any, Callable, Optional
|
||
|
||
from hub_trades_lib import current_trading_day, parse_dt_for_trading_day
|
||
|
||
HUB_DIR = Path(__file__).resolve().parent
|
||
STATE_PATH = HUB_DIR / "hub_supervisor_state.json"
|
||
|
||
PROGRAM_RESULTS = frozenset({"止盈", "止损", "保本止盈", "移动止盈"})
|
||
MANUAL_CLOSE_RESULTS = frozenset({"手动平仓"})
|
||
HUB_CLOSE_RESULTS = frozenset({"强制清仓"})
|
||
WEAK_RESULTS = frozenset({"外部平仓", "时间平仓"})
|
||
|
||
EVENT_OPEN = "open"
|
||
EVENT_MANUAL_CLOSE = "manual_close"
|
||
EVENT_HUB_CLOSE = "hub_close"
|
||
EVENT_PROGRAM_TP = "program_tp"
|
||
EVENT_PROGRAM_SL = "program_sl"
|
||
EVENT_EXTERNAL = "external"
|
||
EVENT_FREQ_WARN = "freq_warn"
|
||
|
||
DEFAULT_SUPERVISOR = {
|
||
"enabled": True,
|
||
"wechat_webhook": "",
|
||
"wechat_link_base": "http://127.0.0.1:5100/ai?mode=supervisor",
|
||
"wechat_prefix": "【交易监管】",
|
||
"wechat_on_program_tp_sl": True,
|
||
"manual_close_daily_warn": 2,
|
||
"interval_warn_minutes": 15,
|
||
"freq_30m_count": 2,
|
||
"reopen_after_close_minutes": 30,
|
||
}
|
||
|
||
|
||
def _now_str() -> str:
|
||
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
|
||
def _atomic_write(path: Path, data: dict) -> None:
|
||
path.parent.mkdir(parents=True, exist_ok=True)
|
||
tmp = path.with_suffix(path.suffix + ".tmp")
|
||
tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
|
||
os.replace(tmp, path)
|
||
|
||
|
||
def _load_json(path: Path, default: dict) -> dict:
|
||
if not path.is_file():
|
||
return dict(default)
|
||
try:
|
||
loaded = json.loads(path.read_text(encoding="utf-8"))
|
||
if isinstance(loaded, dict):
|
||
return loaded
|
||
except Exception:
|
||
pass
|
||
return dict(default)
|
||
|
||
|
||
def normalize_supervisor_settings(raw: dict | None) -> dict:
|
||
out = dict(DEFAULT_SUPERVISOR)
|
||
env_webhook = (os.getenv("SUPERVISOR_WECHAT_WEBHOOK") or "").strip()
|
||
env_link = (os.getenv("SUPERVISOR_WECHAT_LINK") or "").strip()
|
||
if env_webhook:
|
||
out["wechat_webhook"] = env_webhook
|
||
if env_link:
|
||
out["wechat_link_base"] = env_link
|
||
if not isinstance(raw, dict):
|
||
return out
|
||
for key in DEFAULT_SUPERVISOR:
|
||
if key not in raw:
|
||
continue
|
||
val = raw.get(key)
|
||
if key == "enabled" or key == "wechat_on_program_tp_sl":
|
||
out[key] = bool(val)
|
||
elif key in ("manual_close_daily_warn", "freq_30m_count"):
|
||
try:
|
||
out[key] = max(1, int(val))
|
||
except (TypeError, ValueError):
|
||
pass
|
||
elif key in ("interval_warn_minutes", "reopen_after_close_minutes"):
|
||
try:
|
||
out[key] = max(1, int(val))
|
||
except (TypeError, ValueError):
|
||
pass
|
||
elif isinstance(val, str):
|
||
out[key] = val.strip()
|
||
return out
|
||
|
||
|
||
def load_supervisor_state() -> dict:
|
||
data = _load_json(STATE_PATH, {"version": 1, "trading_day": "", "processed": [], "positions": {}, "stats": {}})
|
||
data.setdefault("version", 1)
|
||
data.setdefault("processed", [])
|
||
data.setdefault("positions", {})
|
||
data.setdefault("stats", {})
|
||
return data
|
||
|
||
|
||
def save_supervisor_state(data: dict) -> None:
|
||
processed = list(data.get("processed") or [])
|
||
if len(processed) > 500:
|
||
processed = processed[-500:]
|
||
data["processed"] = processed
|
||
_atomic_write(STATE_PATH, data)
|
||
|
||
|
||
def _trade_event_id(trade: dict) -> str:
|
||
return "|".join(
|
||
[
|
||
str(trade.get("account_name") or trade.get("account_key") or ""),
|
||
str(trade.get("symbol") or ""),
|
||
str(trade.get("closed_at") or ""),
|
||
str(trade.get("result") or ""),
|
||
str(trade.get("pnl_amount") or ""),
|
||
]
|
||
)
|
||
|
||
|
||
def classify_close_result(result: str) -> str:
|
||
r = (result or "").strip()
|
||
if r in PROGRAM_RESULTS:
|
||
if r == "止损":
|
||
return EVENT_PROGRAM_SL
|
||
return EVENT_PROGRAM_TP
|
||
if r in MANUAL_CLOSE_RESULTS:
|
||
return EVENT_MANUAL_CLOSE
|
||
if r in HUB_CLOSE_RESULTS:
|
||
return EVENT_HUB_CLOSE
|
||
if r in WEAK_RESULTS:
|
||
return EVENT_EXTERNAL
|
||
return EVENT_EXTERNAL
|
||
|
||
|
||
def is_supervised_event(event_type: str) -> bool:
|
||
return event_type in (EVENT_OPEN, EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE)
|
||
|
||
|
||
def is_program_event(event_type: str) -> bool:
|
||
return event_type in (EVENT_PROGRAM_TP, EVENT_PROGRAM_SL)
|
||
|
||
|
||
def _normalize_position_symbol(sym: str) -> str:
|
||
"""统一合约名,避免 ZEC/USDT 与 ZEC/USDT:USDT 被当成两笔持仓。"""
|
||
s = (sym or "").strip().upper()
|
||
if not s:
|
||
return ""
|
||
if s.endswith(":USDT") and "/" in s:
|
||
return s.rsplit(":", 1)[0]
|
||
return s
|
||
|
||
|
||
def _position_key(exchange_id: str, symbol: str, side: str) -> str:
|
||
sym = _normalize_position_symbol(symbol)
|
||
sd = (side or "long").strip().lower() or "long"
|
||
return f"{exchange_id}|{sym}|{sd}"
|
||
|
||
|
||
def _position_contracts(pos: dict) -> float:
|
||
for key in ("contracts", "contracts_signed", "size"):
|
||
try:
|
||
v = pos.get(key)
|
||
if v is not None and v != "":
|
||
return abs(float(v))
|
||
except (TypeError, ValueError):
|
||
continue
|
||
return 0.0
|
||
|
||
|
||
def collect_position_keys(board_payload: dict | None) -> dict[str, dict]:
|
||
out: dict[str, dict] = {}
|
||
rows = (board_payload or {}).get("rows") or []
|
||
for row in rows:
|
||
if not isinstance(row, dict):
|
||
continue
|
||
ex_id = str(row.get("id") or row.get("key") or "")
|
||
ex_name = str(row.get("name") or row.get("key") or ex_id)
|
||
ag = row.get("agent") or {}
|
||
for p in ag.get("positions") or []:
|
||
if not isinstance(p, dict):
|
||
continue
|
||
if _position_contracts(p) < 1e-12:
|
||
continue
|
||
sym = str(p.get("symbol") or "")
|
||
side = str(p.get("side") or "").lower() or "long"
|
||
key = _position_key(ex_id, sym, side)
|
||
out[key] = {
|
||
"exchange_id": ex_id,
|
||
"exchange_name": ex_name,
|
||
"symbol": sym,
|
||
"side": side,
|
||
"contracts": _position_contracts(p),
|
||
}
|
||
return out
|
||
|
||
|
||
def _board_agent_snapshot_ready(board_payload: dict | None) -> bool:
|
||
"""监控板各启用账户 agent 快照已就绪(避免空板先入库导致后续持仓误判为新开)。"""
|
||
if not isinstance(board_payload, dict) or board_payload.get("ok") is False:
|
||
return False
|
||
rows = board_payload.get("rows") or []
|
||
if not rows:
|
||
return False
|
||
seen = 0
|
||
for row in rows:
|
||
if not isinstance(row, dict):
|
||
continue
|
||
if row.get("enabled") is False:
|
||
continue
|
||
ag = row.get("agent")
|
||
if not isinstance(ag, dict):
|
||
return False
|
||
seen += 1
|
||
return seen > 0
|
||
|
||
|
||
def _entry_contracts(entry: dict | None) -> float:
|
||
if not isinstance(entry, dict):
|
||
return 0.0
|
||
try:
|
||
return float(entry.get("contracts") or 0)
|
||
except (TypeError, ValueError):
|
||
return 0.0
|
||
|
||
|
||
def detect_new_opens(
|
||
prev_positions: dict[str, dict],
|
||
curr_positions: dict[str, dict],
|
||
) -> list[dict]:
|
||
"""仅当某合约从空仓变为有仓时视为新开(已有持仓不加仓不算)。"""
|
||
events = []
|
||
for key, info in curr_positions.items():
|
||
curr_c = _entry_contracts(info)
|
||
if curr_c < 1e-12:
|
||
continue
|
||
prev_c = _entry_contracts(prev_positions.get(key))
|
||
if prev_c >= 1e-12:
|
||
continue
|
||
events.append({"event_type": EVENT_OPEN, "event_id": f"open:{key}:{_now_str()[:16]}", **info})
|
||
return events
|
||
|
||
|
||
def detect_new_closes(
|
||
prev_processed: set[str],
|
||
closed_trades: list[dict],
|
||
) -> list[dict]:
|
||
events = []
|
||
for trade in closed_trades or []:
|
||
if not isinstance(trade, dict):
|
||
continue
|
||
eid = _trade_event_id(trade)
|
||
if eid in prev_processed:
|
||
continue
|
||
event_type = classify_close_result(str(trade.get("result") or ""))
|
||
events.append(
|
||
{
|
||
"event_type": event_type,
|
||
"event_id": f"close:{eid}",
|
||
"account_name": trade.get("account_name"),
|
||
"symbol": trade.get("symbol"),
|
||
"direction": trade.get("direction"),
|
||
"result": trade.get("result"),
|
||
"pnl_amount": trade.get("pnl_amount"),
|
||
"closed_at": trade.get("closed_at"),
|
||
}
|
||
)
|
||
return events
|
||
|
||
|
||
def _parse_event_dt(raw: Any) -> Optional[datetime]:
|
||
return parse_dt_for_trading_day(raw)
|
||
|
||
|
||
def _supervised_close_times(stats: dict, trading_day: str) -> list[datetime]:
|
||
rows = (stats.get(trading_day) or {}).get("supervised_closes") or []
|
||
out = []
|
||
for item in rows:
|
||
if isinstance(item, dict):
|
||
dt = _parse_event_dt(item.get("closed_at") or item.get("at"))
|
||
else:
|
||
dt = _parse_event_dt(item)
|
||
if dt:
|
||
out.append(dt)
|
||
out.sort()
|
||
return out
|
||
|
||
|
||
def _record_supervised_event(stats: dict, trading_day: str, event: dict) -> None:
|
||
day_stats = stats.setdefault(trading_day, {})
|
||
et = str(event.get("event_type") or "")
|
||
if et == EVENT_OPEN:
|
||
opens = list(day_stats.get("supervised_opens") or [])
|
||
opens.append({"at": _now_str(), "symbol": event.get("symbol")})
|
||
day_stats["supervised_opens"] = opens[-50:]
|
||
return
|
||
if et not in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE):
|
||
return
|
||
closes = list(day_stats.get("supervised_closes") or [])
|
||
closes.append(
|
||
{
|
||
"at": _now_str(),
|
||
"closed_at": event.get("closed_at"),
|
||
"event_type": et,
|
||
"pnl_amount": event.get("pnl_amount"),
|
||
}
|
||
)
|
||
day_stats["supervised_closes"] = closes[-50:]
|
||
|
||
|
||
def evaluate_frequency_warnings(
|
||
*,
|
||
trading_day: str,
|
||
event: dict,
|
||
stats: dict,
|
||
settings: dict,
|
||
) -> list[dict]:
|
||
if not is_supervised_event(str(event.get("event_type") or "")):
|
||
return []
|
||
warnings: list[dict] = []
|
||
day_stats = stats.setdefault(trading_day, {})
|
||
closes = _supervised_close_times(stats, trading_day)
|
||
now = datetime.now()
|
||
if event.get("event_type") in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE):
|
||
evt_dt = _parse_event_dt(event.get("closed_at")) or now
|
||
closes = closes + [evt_dt]
|
||
closes.sort()
|
||
open_count = len(day_stats.get("supervised_opens") or [])
|
||
close_count = len(day_stats.get("supervised_closes") or [])
|
||
if event.get("event_type") == EVENT_OPEN:
|
||
open_count += 1
|
||
elif event.get("event_type") in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE):
|
||
close_count += 1
|
||
|
||
interval_min = int(settings.get("interval_warn_minutes") or 15)
|
||
daily_warn = int(settings.get("manual_close_daily_warn") or 2)
|
||
freq_30m = int(settings.get("freq_30m_count") or 2)
|
||
reopen_min = int(settings.get("reopen_after_close_minutes") or 30)
|
||
|
||
if event.get("event_type") in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE) and len(closes) >= 2:
|
||
prev = closes[-2]
|
||
cur = closes[-1]
|
||
gap = (cur - prev).total_seconds() / 60.0
|
||
if gap < interval_min:
|
||
warnings.append(
|
||
{
|
||
"rule": "INTERVAL_SHORT",
|
||
"message": f"两笔手动/中控平间隔仅 {int(gap)} 分钟(阈值 {interval_min} 分钟)",
|
||
}
|
||
)
|
||
|
||
recent_closes = [t for t in closes if (now - t).total_seconds() <= 30 * 60]
|
||
if event.get("event_type") in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE) and len(recent_closes) >= freq_30m:
|
||
warnings.append(
|
||
{
|
||
"rule": "FREQ_30M",
|
||
"message": f"30 分钟内手动/中控平已达 {len(recent_closes)} 笔(阈值 {freq_30m} 笔)",
|
||
}
|
||
)
|
||
|
||
supervised_total = open_count + close_count
|
||
if supervised_total >= daily_warn and event.get("event_type") in (
|
||
EVENT_MANUAL_CLOSE,
|
||
EVENT_HUB_CLOSE,
|
||
EVENT_OPEN,
|
||
):
|
||
if close_count >= daily_warn:
|
||
warnings.append(
|
||
{
|
||
"rule": "DAILY_COUNT",
|
||
"message": f"今日手动/中控平 {close_count} 笔(阈值 {daily_warn} 笔),注意过度交易",
|
||
}
|
||
)
|
||
|
||
if event.get("event_type") == EVENT_OPEN and closes:
|
||
last_close = closes[-1]
|
||
gap_open = (now - last_close).total_seconds() / 60.0
|
||
if gap_open < reopen_min:
|
||
warnings.append(
|
||
{
|
||
"rule": "REOPEN_FAST",
|
||
"message": f"距上一笔手动/中控平仅 {int(gap_open)} 分钟又新开仓(阈值 {reopen_min} 分钟)",
|
||
}
|
||
)
|
||
|
||
loss_streak = 0
|
||
for item in reversed((stats.get(trading_day) or {}).get("supervised_closes") or []):
|
||
try:
|
||
pnl = float((item or {}).get("pnl_amount") or 0)
|
||
except (TypeError, ValueError):
|
||
pnl = 0.0
|
||
if pnl < 0:
|
||
loss_streak += 1
|
||
else:
|
||
break
|
||
if event.get("event_type") in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE):
|
||
try:
|
||
pnl = float(event.get("pnl_amount") or 0)
|
||
except (TypeError, ValueError):
|
||
pnl = 0.0
|
||
if pnl < 0:
|
||
loss_streak += 1
|
||
else:
|
||
loss_streak = 0
|
||
if loss_streak >= 2 and event.get("event_type") in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE):
|
||
warnings.append(
|
||
{
|
||
"rule": "LOSS_STREAK",
|
||
"message": f"连续 {loss_streak} 笔手动/中控亏损,先停一停",
|
||
}
|
||
)
|
||
|
||
deduped = []
|
||
seen = set()
|
||
for w in warnings:
|
||
key = w.get("rule")
|
||
if key in seen:
|
||
continue
|
||
seen.add(key)
|
||
deduped.append(w)
|
||
return deduped
|
||
|
||
|
||
def event_tag(event_type: str) -> str:
|
||
return {
|
||
EVENT_OPEN: "监管·开仓",
|
||
EVENT_MANUAL_CLOSE: "监管·手动平",
|
||
EVENT_HUB_CLOSE: "监管·中控平",
|
||
EVENT_PROGRAM_TP: "监管·程序止盈",
|
||
EVENT_PROGRAM_SL: "监管·程序止损",
|
||
EVENT_EXTERNAL: "监管·外部平",
|
||
EVENT_FREQ_WARN: "监管·频率",
|
||
}.get(event_type, "监管")
|
||
|
||
|
||
def _fmt_pnl_u(pnl: Any) -> str:
|
||
try:
|
||
v = float(pnl)
|
||
sign = "+" if v > 0 else ""
|
||
return f"{sign}{v:.4f}".rstrip("0").rstrip(".") + "U"
|
||
except (TypeError, ValueError):
|
||
return ""
|
||
|
||
|
||
def build_supervisor_fallback_reply(event: dict, warnings: list[dict] | None = None) -> str:
|
||
"""AI 不可用或返回空时的短评语(不展示错误文案)。"""
|
||
et = str(event.get("event_type") or "")
|
||
sym = str(event.get("symbol") or "—")
|
||
ex = str(event.get("exchange_name") or event.get("account_name") or "").strip()
|
||
pnl_txt = _fmt_pnl_u(event.get("pnl_amount"))
|
||
warn = (warnings or [])[:1]
|
||
warn_txt = str(warn[0].get("message") or "").strip() if warn else ""
|
||
|
||
if et == EVENT_PROGRAM_SL:
|
||
base = f"{sym} 程序止损"
|
||
if pnl_txt:
|
||
base += f"({pnl_txt})"
|
||
base += ",按计划出场是纪律。先歇一会儿,别急着马上再开。"
|
||
elif et == EVENT_PROGRAM_TP:
|
||
base = f"{sym} 程序止盈"
|
||
if pnl_txt:
|
||
base += f"({pnl_txt})"
|
||
base += ",执行不错。保持节奏,别立刻反手再开一单。"
|
||
elif et == EVENT_OPEN:
|
||
who = f"{ex} " if ex else ""
|
||
base = f"看到 {who}新开 {sym}。动手前确认是不是计划内,别因为上一笔情绪再开。"
|
||
elif et == EVENT_HUB_CLOSE:
|
||
base = f"中控平了 {sym}"
|
||
if pnl_txt:
|
||
base += f"({pnl_txt})"
|
||
base += "。"
|
||
base += f" {warn_txt}" if warn_txt else " 停一停,别连着手痒。"
|
||
elif et == EVENT_MANUAL_CLOSE:
|
||
base = f"手动平了 {sym}"
|
||
if pnl_txt:
|
||
base += f"({pnl_txt})"
|
||
base += "。"
|
||
base += f" {warn_txt}" if warn_txt else " 想好再开下一单。"
|
||
elif et == EVENT_FREQ_WARN:
|
||
base = warn_txt or "今日操作偏频繁,先休息一会儿。"
|
||
else:
|
||
base = "收到。确认是否按计划执行,别连续加码。"
|
||
return base.strip()[:320]
|
||
|
||
|
||
def build_system_message(event: dict, *, trading_day: str, warnings: list[dict] | None = None) -> str:
|
||
tag = event_tag(str(event.get("event_type") or ""))
|
||
ex = event.get("exchange_name") or event.get("account_name") or "—"
|
||
sym = event.get("symbol") or "—"
|
||
lines = [f"[{tag}] {ex} · {sym}"]
|
||
et = event.get("event_type")
|
||
if et == EVENT_OPEN:
|
||
side = event.get("side") or event.get("direction") or ""
|
||
if side:
|
||
lines.append(f"方向:{side}")
|
||
elif et in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE, EVENT_PROGRAM_TP, EVENT_PROGRAM_SL, EVENT_EXTERNAL):
|
||
res = event.get("result") or ""
|
||
pnl = event.get("pnl_amount")
|
||
if pnl is not None:
|
||
lines.append(f"结果 {res} · 盈亏 {pnl}U")
|
||
else:
|
||
lines.append(f"结果 {res}")
|
||
if event.get("closed_at"):
|
||
lines.append(f"平仓时间 {event.get('closed_at')}")
|
||
for w in warnings or []:
|
||
lines.append(f"⚠ {w.get('message')}")
|
||
lines.append(f"交易日 {trading_day}")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def build_wechat_body(
|
||
event: dict,
|
||
*,
|
||
trading_day: str,
|
||
link_base: str,
|
||
system_text: str,
|
||
) -> str:
|
||
link = (link_base or "").strip()
|
||
if link:
|
||
sep = "&" if "?" in link else "?"
|
||
link = f"{link}{sep}day={trading_day}"
|
||
body = system_text.replace("\n", "\n")
|
||
if link:
|
||
body += f"\n详情:{link}"
|
||
return body
|
||
|
||
|
||
def should_send_wechat(event: dict, settings: dict) -> bool:
|
||
if not settings.get("enabled", True):
|
||
return False
|
||
webhook = (settings.get("wechat_webhook") or "").strip()
|
||
if not webhook or "replace-me" in webhook.lower():
|
||
return False
|
||
et = str(event.get("event_type") or "")
|
||
if is_program_event(et):
|
||
return bool(settings.get("wechat_on_program_tp_sl", True))
|
||
if et == EVENT_EXTERNAL:
|
||
return False
|
||
return True
|
||
|
||
|
||
def send_supervisor_wechat(
|
||
event: dict,
|
||
*,
|
||
trading_day: str,
|
||
settings: dict,
|
||
system_text: str,
|
||
) -> bool:
|
||
if not should_send_wechat(event, settings):
|
||
return False
|
||
from wechat_notify_lib import send_wechat_webhook
|
||
|
||
prefix = (settings.get("wechat_prefix") or "【交易监管】").strip()
|
||
body = build_wechat_body(
|
||
event,
|
||
trading_day=trading_day,
|
||
link_base=str(settings.get("wechat_link_base") or ""),
|
||
system_text=system_text,
|
||
)
|
||
return bool(
|
||
send_wechat_webhook(
|
||
str(settings.get("wechat_webhook") or ""),
|
||
body,
|
||
prefix=prefix,
|
||
)
|
||
)
|
||
|
||
|
||
_notify_hook: Optional[Callable[[], None]] = None
|
||
|
||
|
||
def set_supervisor_notify_hook(fn: Optional[Callable[[], None]]) -> None:
|
||
global _notify_hook
|
||
_notify_hook = fn
|
||
|
||
|
||
def _fire_notify() -> None:
|
||
if _notify_hook:
|
||
try:
|
||
_notify_hook()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def process_supervisor_tick(
|
||
dashboard_payload: dict | None,
|
||
board_payload: dict | None,
|
||
settings_root: dict | None,
|
||
*,
|
||
reset_hour: int = 8,
|
||
ai_reply_fn: Optional[Callable[..., str]] = None,
|
||
) -> dict[str, Any]:
|
||
"""单次监管扫描:对比快照、写会话、推微信、可选 AI 评语。"""
|
||
from hub_ai.supervisor_store import (
|
||
append_supervisor_ai_message,
|
||
append_supervisor_system_message,
|
||
ensure_supervisor_session,
|
||
)
|
||
|
||
sup_cfg = normalize_supervisor_settings((settings_root or {}).get("supervisor"))
|
||
if not sup_cfg.get("enabled", True):
|
||
return {"ok": True, "skipped": True, "reason": "disabled"}
|
||
|
||
dash = dashboard_payload or {}
|
||
trading_day = str(dash.get("trading_day") or current_trading_day(reset_hour=reset_hour))
|
||
state = load_supervisor_state()
|
||
if str(state.get("trading_day") or "") != trading_day:
|
||
state = {
|
||
"version": 1,
|
||
"trading_day": trading_day,
|
||
"processed": [],
|
||
"positions": {},
|
||
"stats": {trading_day: state.get("stats", {}).get(trading_day, {})},
|
||
"positions_baseline_ready": False,
|
||
}
|
||
|
||
processed = set(str(x) for x in (state.get("processed") or []))
|
||
stats = dict(state.get("stats") or {})
|
||
prev_positions = dict(state.get("positions") or {})
|
||
curr_positions = collect_position_keys(board_payload)
|
||
closed_trades = dash.get("closed_trades") or []
|
||
board_ready = _board_agent_snapshot_ready(board_payload)
|
||
|
||
if not state.get("positions_baseline_ready"):
|
||
for trade in closed_trades:
|
||
if isinstance(trade, dict):
|
||
processed.add(f"close:{_trade_event_id(trade)}")
|
||
if not board_ready:
|
||
state["trading_day"] = trading_day
|
||
state["processed"] = list(processed)
|
||
save_supervisor_state(state)
|
||
return {"ok": True, "events": 0, "waiting_board": True, "trading_day": trading_day}
|
||
state["trading_day"] = trading_day
|
||
state["processed"] = list(processed)
|
||
state["positions"] = curr_positions
|
||
state["positions_baseline_ready"] = True
|
||
state["initialized"] = True
|
||
save_supervisor_state(state)
|
||
return {
|
||
"ok": True,
|
||
"events": 0,
|
||
"seeded": True,
|
||
"trading_day": trading_day,
|
||
"positions": len(curr_positions),
|
||
}
|
||
|
||
raw_events = detect_new_opens(prev_positions, curr_positions) + detect_new_closes(
|
||
processed, closed_trades
|
||
)
|
||
if not raw_events:
|
||
state["positions"] = curr_positions
|
||
save_supervisor_state(state)
|
||
return {"ok": True, "events": 0}
|
||
|
||
session = ensure_supervisor_session(trading_day)
|
||
session_id = str(session.get("id") or "")
|
||
handled = 0
|
||
|
||
for event in raw_events:
|
||
eid = str(event.get("event_id") or uuid.uuid4().hex)
|
||
if eid in processed:
|
||
continue
|
||
et = str(event.get("event_type") or "")
|
||
if et == EVENT_EXTERNAL:
|
||
processed.add(eid)
|
||
continue
|
||
|
||
warnings = evaluate_frequency_warnings(
|
||
trading_day=trading_day,
|
||
event=event,
|
||
stats=stats,
|
||
settings=sup_cfg,
|
||
)
|
||
if is_supervised_event(et):
|
||
_record_supervised_event(stats, trading_day, event)
|
||
|
||
system_text = build_system_message(event, trading_day=trading_day, warnings=warnings)
|
||
append_supervisor_system_message(
|
||
session_id,
|
||
system_text,
|
||
event_type=et,
|
||
level="warn" if warnings else "info",
|
||
)
|
||
send_supervisor_wechat(
|
||
event,
|
||
trading_day=trading_day,
|
||
settings=sup_cfg,
|
||
system_text=system_text,
|
||
)
|
||
for w in warnings:
|
||
warn_event = {
|
||
"event_type": EVENT_FREQ_WARN,
|
||
"event_id": f"warn:{eid}:{w.get('rule')}",
|
||
**event,
|
||
"warn_message": w.get("message"),
|
||
}
|
||
warn_text = f"[{event_tag(EVENT_FREQ_WARN)}] {w.get('message')}"
|
||
append_supervisor_system_message(
|
||
session_id,
|
||
warn_text,
|
||
event_type=EVENT_FREQ_WARN,
|
||
level="warn",
|
||
)
|
||
send_supervisor_wechat(
|
||
warn_event,
|
||
trading_day=trading_day,
|
||
settings=sup_cfg,
|
||
system_text=warn_text,
|
||
)
|
||
|
||
if ai_reply_fn and et != EVENT_EXTERNAL:
|
||
evt_snapshot = dict(event)
|
||
evt_warnings = list(warnings)
|
||
|
||
def _ai_bg() -> None:
|
||
try:
|
||
reply = ai_reply_fn(
|
||
event=evt_snapshot,
|
||
warnings=evt_warnings,
|
||
trading_day=trading_day,
|
||
session_id=session_id,
|
||
)
|
||
from hub_ai.text_util import is_ai_error_reply
|
||
|
||
text = str(reply or "").strip()
|
||
if not text or is_ai_error_reply(text):
|
||
text = build_supervisor_fallback_reply(evt_snapshot, evt_warnings)
|
||
if text:
|
||
append_supervisor_ai_message(session_id, text)
|
||
_fire_notify()
|
||
except Exception:
|
||
try:
|
||
fb = build_supervisor_fallback_reply(evt_snapshot, evt_warnings)
|
||
if fb:
|
||
append_supervisor_ai_message(session_id, fb)
|
||
_fire_notify()
|
||
except Exception:
|
||
pass
|
||
|
||
threading.Thread(target=_ai_bg, daemon=True).start()
|
||
|
||
processed.add(eid)
|
||
handled += 1
|
||
|
||
state["trading_day"] = trading_day
|
||
state["processed"] = list(processed)
|
||
state["positions"] = curr_positions
|
||
state["stats"] = stats
|
||
save_supervisor_state(state)
|
||
if handled:
|
||
_fire_notify()
|
||
return {"ok": True, "events": handled, "trading_day": trading_day, "session_id": session_id}
|