Files
crypto_monitor/manual_trading_hub/hub_supervisor_lib.py
T
2026-06-23 20:20:33 +08:00

758 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""交易监管:事件分类、频率规则、会话消息与企业微信推送。"""
from __future__ import annotations
import json
import os
import threading
import uuid
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Callable, Optional
from hub_trades_lib import current_trading_day, parse_dt_for_trading_day
HUB_DIR = Path(__file__).resolve().parent
STATE_PATH = HUB_DIR / "hub_supervisor_state.json"
PROGRAM_RESULTS = frozenset({"止盈", "止损", "保本止盈", "移动止盈"})
MANUAL_CLOSE_RESULTS = frozenset({"手动平仓"})
HUB_CLOSE_RESULTS = frozenset({"强制清仓"})
WEAK_RESULTS = frozenset({"外部平仓", "时间平仓"})
EVENT_OPEN = "open"
EVENT_MANUAL_CLOSE = "manual_close"
EVENT_HUB_CLOSE = "hub_close"
EVENT_PROGRAM_TP = "program_tp"
EVENT_PROGRAM_SL = "program_sl"
EVENT_EXTERNAL = "external"
EVENT_FREQ_WARN = "freq_warn"
DEFAULT_SUPERVISOR = {
"enabled": True,
"wechat_webhook": "",
"wechat_link_base": "http://127.0.0.1:5100/ai?mode=supervisor",
"wechat_prefix": "【交易监管】",
"wechat_on_program_tp_sl": True,
"manual_close_daily_warn": 2,
"interval_warn_minutes": 15,
"freq_30m_count": 2,
"reopen_after_close_minutes": 30,
}
def _now_str() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def _atomic_write(path: Path, data: dict) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
os.replace(tmp, path)
def _load_json(path: Path, default: dict) -> dict:
if not path.is_file():
return dict(default)
try:
loaded = json.loads(path.read_text(encoding="utf-8"))
if isinstance(loaded, dict):
return loaded
except Exception:
pass
return dict(default)
def normalize_supervisor_settings(raw: dict | None) -> dict:
out = dict(DEFAULT_SUPERVISOR)
env_webhook = (os.getenv("SUPERVISOR_WECHAT_WEBHOOK") or "").strip()
env_link = (os.getenv("SUPERVISOR_WECHAT_LINK") or "").strip()
if env_webhook:
out["wechat_webhook"] = env_webhook
if env_link:
out["wechat_link_base"] = env_link
if not isinstance(raw, dict):
return out
for key in DEFAULT_SUPERVISOR:
if key not in raw:
continue
val = raw.get(key)
if key == "enabled" or key == "wechat_on_program_tp_sl":
out[key] = bool(val)
elif key in ("manual_close_daily_warn", "freq_30m_count"):
try:
out[key] = max(1, int(val))
except (TypeError, ValueError):
pass
elif key in ("interval_warn_minutes", "reopen_after_close_minutes"):
try:
out[key] = max(1, int(val))
except (TypeError, ValueError):
pass
elif isinstance(val, str):
out[key] = val.strip()
return out
def load_supervisor_state() -> dict:
data = _load_json(STATE_PATH, {"version": 1, "trading_day": "", "processed": [], "positions": {}, "stats": {}})
data.setdefault("version", 1)
data.setdefault("processed", [])
data.setdefault("positions", {})
data.setdefault("stats", {})
return data
def save_supervisor_state(data: dict) -> None:
processed = list(data.get("processed") or [])
if len(processed) > 500:
processed = processed[-500:]
data["processed"] = processed
_atomic_write(STATE_PATH, data)
def _trade_event_id(trade: dict) -> str:
return "|".join(
[
str(trade.get("account_name") or trade.get("account_key") or ""),
str(trade.get("symbol") or ""),
str(trade.get("closed_at") or ""),
str(trade.get("result") or ""),
str(trade.get("pnl_amount") or ""),
]
)
def classify_close_result(result: str) -> str:
r = (result or "").strip()
if r in PROGRAM_RESULTS:
if r == "止损":
return EVENT_PROGRAM_SL
return EVENT_PROGRAM_TP
if r in MANUAL_CLOSE_RESULTS:
return EVENT_MANUAL_CLOSE
if r in HUB_CLOSE_RESULTS:
return EVENT_HUB_CLOSE
if r in WEAK_RESULTS:
return EVENT_EXTERNAL
return EVENT_EXTERNAL
def is_supervised_event(event_type: str) -> bool:
return event_type in (EVENT_OPEN, EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE)
def is_program_event(event_type: str) -> bool:
return event_type in (EVENT_PROGRAM_TP, EVENT_PROGRAM_SL)
def _normalize_position_symbol(sym: str) -> str:
"""统一合约名,避免 ZEC/USDT 与 ZEC/USDT:USDT 被当成两笔持仓。"""
s = (sym or "").strip().upper()
if not s:
return ""
if s.endswith(":USDT") and "/" in s:
return s.rsplit(":", 1)[0]
return s
def _position_key(exchange_id: str, symbol: str, side: str) -> str:
sym = _normalize_position_symbol(symbol)
sd = (side or "long").strip().lower() or "long"
return f"{exchange_id}|{sym}|{sd}"
def _position_contracts(pos: dict) -> float:
for key in ("contracts", "contracts_signed", "size"):
try:
v = pos.get(key)
if v is not None and v != "":
return abs(float(v))
except (TypeError, ValueError):
continue
return 0.0
def collect_position_keys(board_payload: dict | None) -> dict[str, dict]:
out: dict[str, dict] = {}
rows = (board_payload or {}).get("rows") or []
for row in rows:
if not isinstance(row, dict):
continue
ex_id = str(row.get("id") or row.get("key") or "")
ex_name = str(row.get("name") or row.get("key") or ex_id)
ag = row.get("agent") or {}
for p in ag.get("positions") or []:
if not isinstance(p, dict):
continue
if _position_contracts(p) < 1e-12:
continue
sym = str(p.get("symbol") or "")
side = str(p.get("side") or "").lower() or "long"
key = _position_key(ex_id, sym, side)
out[key] = {
"exchange_id": ex_id,
"exchange_name": ex_name,
"symbol": sym,
"side": side,
"contracts": _position_contracts(p),
}
return out
def _board_agent_snapshot_ready(board_payload: dict | None) -> bool:
"""监控板各启用账户 agent 快照已就绪(避免空板先入库导致后续持仓误判为新开)。"""
if not isinstance(board_payload, dict) or board_payload.get("ok") is False:
return False
rows = board_payload.get("rows") or []
if not rows:
return False
seen = 0
for row in rows:
if not isinstance(row, dict):
continue
if row.get("enabled") is False:
continue
ag = row.get("agent")
if not isinstance(ag, dict):
return False
seen += 1
return seen > 0
def _entry_contracts(entry: dict | None) -> float:
if not isinstance(entry, dict):
return 0.0
try:
return float(entry.get("contracts") or 0)
except (TypeError, ValueError):
return 0.0
def detect_new_opens(
prev_positions: dict[str, dict],
curr_positions: dict[str, dict],
) -> list[dict]:
"""仅当某合约从空仓变为有仓时视为新开(已有持仓不加仓不算)。"""
events = []
for key, info in curr_positions.items():
curr_c = _entry_contracts(info)
if curr_c < 1e-12:
continue
prev_c = _entry_contracts(prev_positions.get(key))
if prev_c >= 1e-12:
continue
events.append({"event_type": EVENT_OPEN, "event_id": f"open:{key}:{_now_str()[:16]}", **info})
return events
def detect_new_closes(
prev_processed: set[str],
closed_trades: list[dict],
) -> list[dict]:
events = []
for trade in closed_trades or []:
if not isinstance(trade, dict):
continue
eid = _trade_event_id(trade)
if eid in prev_processed:
continue
event_type = classify_close_result(str(trade.get("result") or ""))
events.append(
{
"event_type": event_type,
"event_id": f"close:{eid}",
"account_name": trade.get("account_name"),
"symbol": trade.get("symbol"),
"direction": trade.get("direction"),
"result": trade.get("result"),
"pnl_amount": trade.get("pnl_amount"),
"closed_at": trade.get("closed_at"),
}
)
return events
def _parse_event_dt(raw: Any) -> Optional[datetime]:
return parse_dt_for_trading_day(raw)
def _supervised_close_times(stats: dict, trading_day: str) -> list[datetime]:
rows = (stats.get(trading_day) or {}).get("supervised_closes") or []
out = []
for item in rows:
if isinstance(item, dict):
dt = _parse_event_dt(item.get("closed_at") or item.get("at"))
else:
dt = _parse_event_dt(item)
if dt:
out.append(dt)
out.sort()
return out
def _record_supervised_event(stats: dict, trading_day: str, event: dict) -> None:
day_stats = stats.setdefault(trading_day, {})
et = str(event.get("event_type") or "")
if et == EVENT_OPEN:
opens = list(day_stats.get("supervised_opens") or [])
opens.append({"at": _now_str(), "symbol": event.get("symbol")})
day_stats["supervised_opens"] = opens[-50:]
return
if et not in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE):
return
closes = list(day_stats.get("supervised_closes") or [])
closes.append(
{
"at": _now_str(),
"closed_at": event.get("closed_at"),
"event_type": et,
"pnl_amount": event.get("pnl_amount"),
}
)
day_stats["supervised_closes"] = closes[-50:]
def evaluate_frequency_warnings(
*,
trading_day: str,
event: dict,
stats: dict,
settings: dict,
) -> list[dict]:
if not is_supervised_event(str(event.get("event_type") or "")):
return []
warnings: list[dict] = []
day_stats = stats.setdefault(trading_day, {})
closes = _supervised_close_times(stats, trading_day)
now = datetime.now()
if event.get("event_type") in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE):
evt_dt = _parse_event_dt(event.get("closed_at")) or now
closes = closes + [evt_dt]
closes.sort()
open_count = len(day_stats.get("supervised_opens") or [])
close_count = len(day_stats.get("supervised_closes") or [])
if event.get("event_type") == EVENT_OPEN:
open_count += 1
elif event.get("event_type") in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE):
close_count += 1
interval_min = int(settings.get("interval_warn_minutes") or 15)
daily_warn = int(settings.get("manual_close_daily_warn") or 2)
freq_30m = int(settings.get("freq_30m_count") or 2)
reopen_min = int(settings.get("reopen_after_close_minutes") or 30)
if event.get("event_type") in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE) and len(closes) >= 2:
prev = closes[-2]
cur = closes[-1]
gap = (cur - prev).total_seconds() / 60.0
if gap < interval_min:
warnings.append(
{
"rule": "INTERVAL_SHORT",
"message": f"两笔手动/中控平间隔仅 {int(gap)} 分钟(阈值 {interval_min} 分钟)",
}
)
recent_closes = [t for t in closes if (now - t).total_seconds() <= 30 * 60]
if event.get("event_type") in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE) and len(recent_closes) >= freq_30m:
warnings.append(
{
"rule": "FREQ_30M",
"message": f"30 分钟内手动/中控平已达 {len(recent_closes)} 笔(阈值 {freq_30m} 笔)",
}
)
supervised_total = open_count + close_count
if supervised_total >= daily_warn and event.get("event_type") in (
EVENT_MANUAL_CLOSE,
EVENT_HUB_CLOSE,
EVENT_OPEN,
):
if close_count >= daily_warn:
warnings.append(
{
"rule": "DAILY_COUNT",
"message": f"今日手动/中控平 {close_count} 笔(阈值 {daily_warn} 笔),注意过度交易",
}
)
if event.get("event_type") == EVENT_OPEN and closes:
last_close = closes[-1]
gap_open = (now - last_close).total_seconds() / 60.0
if gap_open < reopen_min:
warnings.append(
{
"rule": "REOPEN_FAST",
"message": f"距上一笔手动/中控平仅 {int(gap_open)} 分钟又新开仓(阈值 {reopen_min} 分钟)",
}
)
loss_streak = 0
for item in reversed((stats.get(trading_day) or {}).get("supervised_closes") or []):
try:
pnl = float((item or {}).get("pnl_amount") or 0)
except (TypeError, ValueError):
pnl = 0.0
if pnl < 0:
loss_streak += 1
else:
break
if event.get("event_type") in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE):
try:
pnl = float(event.get("pnl_amount") or 0)
except (TypeError, ValueError):
pnl = 0.0
if pnl < 0:
loss_streak += 1
else:
loss_streak = 0
if loss_streak >= 2 and event.get("event_type") in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE):
warnings.append(
{
"rule": "LOSS_STREAK",
"message": f"连续 {loss_streak} 笔手动/中控亏损,先停一停",
}
)
deduped = []
seen = set()
for w in warnings:
key = w.get("rule")
if key in seen:
continue
seen.add(key)
deduped.append(w)
return deduped
def event_tag(event_type: str) -> str:
return {
EVENT_OPEN: "监管·开仓",
EVENT_MANUAL_CLOSE: "监管·手动平",
EVENT_HUB_CLOSE: "监管·中控平",
EVENT_PROGRAM_TP: "监管·程序止盈",
EVENT_PROGRAM_SL: "监管·程序止损",
EVENT_EXTERNAL: "监管·外部平",
EVENT_FREQ_WARN: "监管·频率",
}.get(event_type, "监管")
def _fmt_pnl_u(pnl: Any) -> str:
try:
v = float(pnl)
sign = "+" if v > 0 else ""
return f"{sign}{v:.4f}".rstrip("0").rstrip(".") + "U"
except (TypeError, ValueError):
return ""
def build_supervisor_fallback_reply(event: dict, warnings: list[dict] | None = None) -> str:
"""AI 不可用或返回空时的短评语(不展示错误文案)。"""
et = str(event.get("event_type") or "")
sym = str(event.get("symbol") or "")
ex = str(event.get("exchange_name") or event.get("account_name") or "").strip()
pnl_txt = _fmt_pnl_u(event.get("pnl_amount"))
warn = (warnings or [])[:1]
warn_txt = str(warn[0].get("message") or "").strip() if warn else ""
if et == EVENT_PROGRAM_SL:
base = f"{sym} 程序止损"
if pnl_txt:
base += f"{pnl_txt}"
base += ",按计划出场是纪律。先歇一会儿,别急着马上再开。"
elif et == EVENT_PROGRAM_TP:
base = f"{sym} 程序止盈"
if pnl_txt:
base += f"{pnl_txt}"
base += ",执行不错。保持节奏,别立刻反手再开一单。"
elif et == EVENT_OPEN:
who = f"{ex} " if ex else ""
base = f"看到 {who}新开 {sym}。动手前确认是不是计划内,别因为上一笔情绪再开。"
elif et == EVENT_HUB_CLOSE:
base = f"中控平了 {sym}"
if pnl_txt:
base += f"{pnl_txt}"
base += ""
base += f" {warn_txt}" if warn_txt else " 停一停,别连着手痒。"
elif et == EVENT_MANUAL_CLOSE:
base = f"手动平了 {sym}"
if pnl_txt:
base += f"{pnl_txt}"
base += ""
base += f" {warn_txt}" if warn_txt else " 想好再开下一单。"
elif et == EVENT_FREQ_WARN:
base = warn_txt or "今日操作偏频繁,先休息一会儿。"
else:
base = "收到。确认是否按计划执行,别连续加码。"
return base.strip()[:320]
def build_system_message(event: dict, *, trading_day: str, warnings: list[dict] | None = None) -> str:
tag = event_tag(str(event.get("event_type") or ""))
ex = event.get("exchange_name") or event.get("account_name") or ""
sym = event.get("symbol") or ""
lines = [f"[{tag}] {ex} · {sym}"]
et = event.get("event_type")
if et == EVENT_OPEN:
side = event.get("side") or event.get("direction") or ""
if side:
lines.append(f"方向:{side}")
elif et in (EVENT_MANUAL_CLOSE, EVENT_HUB_CLOSE, EVENT_PROGRAM_TP, EVENT_PROGRAM_SL, EVENT_EXTERNAL):
res = event.get("result") or ""
pnl = event.get("pnl_amount")
if pnl is not None:
lines.append(f"结果 {res} · 盈亏 {pnl}U")
else:
lines.append(f"结果 {res}")
if event.get("closed_at"):
lines.append(f"平仓时间 {event.get('closed_at')}")
for w in warnings or []:
lines.append(f"{w.get('message')}")
lines.append(f"交易日 {trading_day}")
return "\n".join(lines)
def build_wechat_body(
event: dict,
*,
trading_day: str,
link_base: str,
system_text: str,
) -> str:
link = (link_base or "").strip()
if link:
sep = "&" if "?" in link else "?"
link = f"{link}{sep}day={trading_day}"
body = system_text.replace("\n", "\n")
if link:
body += f"\n详情:{link}"
return body
def should_send_wechat(event: dict, settings: dict) -> bool:
if not settings.get("enabled", True):
return False
webhook = (settings.get("wechat_webhook") or "").strip()
if not webhook or "replace-me" in webhook.lower():
return False
et = str(event.get("event_type") or "")
if is_program_event(et):
return bool(settings.get("wechat_on_program_tp_sl", True))
if et == EVENT_EXTERNAL:
return False
return True
def send_supervisor_wechat(
event: dict,
*,
trading_day: str,
settings: dict,
system_text: str,
) -> bool:
if not should_send_wechat(event, settings):
return False
from wechat_notify_lib import send_wechat_webhook
prefix = (settings.get("wechat_prefix") or "【交易监管】").strip()
body = build_wechat_body(
event,
trading_day=trading_day,
link_base=str(settings.get("wechat_link_base") or ""),
system_text=system_text,
)
return bool(
send_wechat_webhook(
str(settings.get("wechat_webhook") or ""),
body,
prefix=prefix,
)
)
_notify_hook: Optional[Callable[[], None]] = None
def set_supervisor_notify_hook(fn: Optional[Callable[[], None]]) -> None:
global _notify_hook
_notify_hook = fn
def _fire_notify() -> None:
if _notify_hook:
try:
_notify_hook()
except Exception:
pass
def process_supervisor_tick(
dashboard_payload: dict | None,
board_payload: dict | None,
settings_root: dict | None,
*,
reset_hour: int = 8,
ai_reply_fn: Optional[Callable[..., str]] = None,
) -> dict[str, Any]:
"""单次监管扫描:对比快照、写会话、推微信、可选 AI 评语。"""
from hub_ai.supervisor_store import (
append_supervisor_ai_message,
append_supervisor_system_message,
ensure_supervisor_session,
)
sup_cfg = normalize_supervisor_settings((settings_root or {}).get("supervisor"))
if not sup_cfg.get("enabled", True):
return {"ok": True, "skipped": True, "reason": "disabled"}
dash = dashboard_payload or {}
trading_day = str(dash.get("trading_day") or current_trading_day(reset_hour=reset_hour))
state = load_supervisor_state()
if str(state.get("trading_day") or "") != trading_day:
state = {
"version": 1,
"trading_day": trading_day,
"processed": [],
"positions": {},
"stats": {trading_day: state.get("stats", {}).get(trading_day, {})},
"positions_baseline_ready": False,
}
processed = set(str(x) for x in (state.get("processed") or []))
stats = dict(state.get("stats") or {})
prev_positions = dict(state.get("positions") or {})
curr_positions = collect_position_keys(board_payload)
closed_trades = dash.get("closed_trades") or []
board_ready = _board_agent_snapshot_ready(board_payload)
if not state.get("positions_baseline_ready"):
for trade in closed_trades:
if isinstance(trade, dict):
processed.add(f"close:{_trade_event_id(trade)}")
if not board_ready:
state["trading_day"] = trading_day
state["processed"] = list(processed)
save_supervisor_state(state)
return {"ok": True, "events": 0, "waiting_board": True, "trading_day": trading_day}
state["trading_day"] = trading_day
state["processed"] = list(processed)
state["positions"] = curr_positions
state["positions_baseline_ready"] = True
state["initialized"] = True
save_supervisor_state(state)
return {
"ok": True,
"events": 0,
"seeded": True,
"trading_day": trading_day,
"positions": len(curr_positions),
}
raw_events = detect_new_opens(prev_positions, curr_positions) + detect_new_closes(
processed, closed_trades
)
if not raw_events:
state["positions"] = curr_positions
save_supervisor_state(state)
return {"ok": True, "events": 0}
session = ensure_supervisor_session(trading_day)
session_id = str(session.get("id") or "")
handled = 0
for event in raw_events:
eid = str(event.get("event_id") or uuid.uuid4().hex)
if eid in processed:
continue
et = str(event.get("event_type") or "")
if et == EVENT_EXTERNAL:
processed.add(eid)
continue
warnings = evaluate_frequency_warnings(
trading_day=trading_day,
event=event,
stats=stats,
settings=sup_cfg,
)
if is_supervised_event(et):
_record_supervised_event(stats, trading_day, event)
system_text = build_system_message(event, trading_day=trading_day, warnings=warnings)
append_supervisor_system_message(
session_id,
system_text,
event_type=et,
level="warn" if warnings else "info",
)
send_supervisor_wechat(
event,
trading_day=trading_day,
settings=sup_cfg,
system_text=system_text,
)
for w in warnings:
warn_event = {
"event_type": EVENT_FREQ_WARN,
"event_id": f"warn:{eid}:{w.get('rule')}",
**event,
"warn_message": w.get("message"),
}
warn_text = f"[{event_tag(EVENT_FREQ_WARN)}] {w.get('message')}"
append_supervisor_system_message(
session_id,
warn_text,
event_type=EVENT_FREQ_WARN,
level="warn",
)
send_supervisor_wechat(
warn_event,
trading_day=trading_day,
settings=sup_cfg,
system_text=warn_text,
)
if ai_reply_fn and et != EVENT_EXTERNAL:
evt_snapshot = dict(event)
evt_warnings = list(warnings)
def _ai_bg() -> None:
try:
reply = ai_reply_fn(
event=evt_snapshot,
warnings=evt_warnings,
trading_day=trading_day,
session_id=session_id,
)
from hub_ai.text_util import is_ai_error_reply
text = str(reply or "").strip()
if not text or is_ai_error_reply(text):
text = build_supervisor_fallback_reply(evt_snapshot, evt_warnings)
if text:
append_supervisor_ai_message(session_id, text)
_fire_notify()
except Exception:
try:
fb = build_supervisor_fallback_reply(evt_snapshot, evt_warnings)
if fb:
append_supervisor_ai_message(session_id, fb)
_fire_notify()
except Exception:
pass
threading.Thread(target=_ai_bg, daemon=True).start()
processed.add(eid)
handled += 1
state["trading_day"] = trading_day
state["processed"] = list(processed)
state["positions"] = curr_positions
state["stats"] = stats
save_supervisor_state(state)
if handled:
_fire_notify()
return {"ok": True, "events": handled, "trading_day": trading_day, "session_id": session_id}