"""中控资金概况:分户日快照(180 交易日)、总资金曲线与回撤。""" from __future__ import annotations import json import os from datetime import datetime, timedelta from pathlib import Path from typing import Any, Optional from hub_trades_lib import current_trading_day HUB_DIR = Path(__file__).resolve().parent / "manual_trading_hub" FUND_HISTORY_PATH = HUB_DIR / "hub_fund_history.json" LEGACY_FUND_HISTORY_PATH = HUB_DIR / "hub_ai_fund_history.json" try: FUND_HISTORY_DAYS = max(30, int(os.getenv("HUB_FUND_HISTORY_DAYS", "180") or "180")) except ValueError: FUND_HISTORY_DAYS = 180 FUND_HISTORY_START_DAY = (os.getenv("HUB_FUND_HISTORY_START_DAY") or "2026-06-09").strip()[:10] def fund_history_start_day() -> str: return FUND_HISTORY_START_DAY or "2026-06-09" def _now_str() -> str: return datetime.now().strftime("%Y-%m-%d %H:%M:%S") def _safe_float(value: Any) -> Optional[float]: try: v = float(value) return v if v >= 0 else None except (TypeError, ValueError): return None def account_total_usdt(funding: Any, trading: Any) -> Optional[float]: """资金户 + 交易户;任一侧缺失则不计入(返回 None)。""" fu = _safe_float(funding) tu = _safe_float(trading) if fu is None or tu is None: return None return round(fu + tu, 4) def compute_drawdown(values: list[float]) -> dict[str, Any]: """基于资金权益序列计算峰值回撤(U 与 %)。""" peak = 0.0 max_dd_u = 0.0 peak_at_end = 0.0 for v in values: if not isinstance(v, (int, float)): continue fv = float(v) if fv > peak: peak = fv dd = peak - fv if dd > max_dd_u: max_dd_u = dd peak_at_end = peak max_dd_u = round(max_dd_u, 4) peak_at_end = round(peak_at_end, 4) max_dd_pct = round((max_dd_u / peak_at_end) * 100, 2) if peak_at_end > 0 else None return { "peak_usdt": peak_at_end, "max_drawdown_u": max_dd_u, "max_drawdown_pct": max_dd_pct, } def _atomic_write(path: Path, data: dict) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp = path.with_suffix(path.suffix + ".tmp") tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") os.replace(tmp, path) def _prune_days( days: dict, *, keep_days: int, anchor_day: str, start_day: Optional[str] = None, ) -> dict: try: anchor = datetime.strptime(anchor_day[:10], "%Y-%m-%d") except ValueError: anchor = datetime.now() rolling_cutoff = (anchor - timedelta(days=max(1, keep_days) - 1)).strftime("%Y-%m-%d") start = (start_day or fund_history_start_day()).strip()[:10] cutoff = max(rolling_cutoff, start) if start else rolling_cutoff return {k: v for k, v in (days or {}).items() if str(k) >= cutoff} def _migrate_legacy_store(days: dict) -> dict: if not LEGACY_FUND_HISTORY_PATH.is_file(): return days try: loaded = json.loads(LEGACY_FUND_HISTORY_PATH.read_text(encoding="utf-8")) legacy_days = loaded.get("days") if isinstance(loaded, dict) else {} if not isinstance(legacy_days, dict): return days merged = dict(days) for day, block in legacy_days.items(): if day in merged: continue if isinstance(block, dict) and block.get("accounts"): merged[day] = block return merged except Exception: return days def _load_store() -> dict: if not FUND_HISTORY_PATH.is_file(): store = {"version": 1, "days": _migrate_legacy_store({})} if store["days"]: _atomic_write(FUND_HISTORY_PATH, store) return store try: loaded = json.loads(FUND_HISTORY_PATH.read_text(encoding="utf-8")) if isinstance(loaded, dict): loaded.setdefault("version", 1) days = dict(loaded.get("days") or {}) loaded["days"] = _migrate_legacy_store(days) return loaded except Exception: pass return {"version": 1, "days": {}} def record_fund_snapshot( trading_day: str, accounts: list[dict], *, keep_days: int = FUND_HISTORY_DAYS, reset_hour: int = 8, ) -> dict[str, Any]: """写入当日各户资金账户/交易账户余额,并裁剪历史。""" day = (trading_day or "").strip()[:10] or current_trading_day(reset_hour=reset_hour) start = fund_history_start_day() if start and day < start: return _load_store().get("days") or {} store = _load_store() days = dict(store.get("days") or {}) row_accounts: dict[str, dict] = {} for ac in accounts or []: key = str(ac.get("key") or ac.get("id") or "").strip() if not key: continue if not ac.get("monitored"): continue fu = _safe_float(ac.get("funding_usdt")) tu = _safe_float(ac.get("trading_usdt")) total = account_total_usdt(fu, tu) if total is None: continue row_accounts[key] = { "name": ac.get("name"), "funding_usdt": fu, "trading_usdt": tu, "total_usdt": total, "recorded_at": _now_str(), } if row_accounts: days[day] = {"accounts": row_accounts, "updated_at": _now_str()} days = _prune_days( days, keep_days=keep_days, anchor_day=day, start_day=fund_history_start_day() ) _atomic_write(FUND_HISTORY_PATH, {"version": 1, "days": days}) return days def record_fund_snapshot_from_board( rows: list[dict], *, keep_days: int = FUND_HISTORY_DAYS, reset_hour: int = 8, ) -> dict[str, Any]: """监控板行写入当日快照(仅 account_ok 且资金/交易户齐全)。""" day = current_trading_day(reset_hour=reset_hour) accounts = [] for row in rows or []: if not isinstance(row, dict): continue if not row.get("account_ok"): continue accounts.append( { "key": row.get("key") or row.get("id"), "name": row.get("name"), "funding_usdt": row.get("funding_usdt"), "trading_usdt": row.get("trading_usdt"), "monitored": True, } ) return record_fund_snapshot(day, accounts, keep_days=keep_days, reset_hour=reset_hour) def get_fund_history(*, anchor_day: str, keep_days: int = FUND_HISTORY_DAYS) -> dict[str, dict]: store = _load_store() return _prune_days( dict(store.get("days") or {}), keep_days=keep_days, anchor_day=anchor_day, start_day=fund_history_start_day(), ) def _exchange_monitored(ex: dict) -> bool: return bool(ex.get("enabled")) and not bool(ex.get("env_disabled")) def _live_row_for_exchange(ex: dict, rows_by_key: dict[str, dict]) -> Optional[dict]: key = str(ex.get("key") or "").strip() if not key: return None return rows_by_key.get(key) def _series_from_history( history: dict[str, dict], account_keys: list[str], ) -> list[dict[str, Any]]: out: list[dict[str, Any]] = [] for day in sorted(history.keys()): block = history.get(day) or {} ac_map = block.get("accounts") or {} total = 0.0 n = 0 for key in account_keys: ac = ac_map.get(key) or {} t = account_total_usdt(ac.get("funding_usdt"), ac.get("trading_usdt")) if t is None: t = _safe_float(ac.get("total_usdt")) if t is None: continue total += t n += 1 if n > 0: out.append({"day": day, "total_usdt": round(total, 4)}) return out def _account_series(history: dict[str, dict], key: str) -> list[dict[str, Any]]: out: list[dict[str, Any]] = [] for day in sorted(history.keys()): ac = (history.get(day) or {}).get("accounts", {}).get(key) or {} t = account_total_usdt(ac.get("funding_usdt"), ac.get("trading_usdt")) if t is None: t = _safe_float(ac.get("total_usdt")) if t is None: continue out.append( { "day": day, "total_usdt": t, "funding_usdt": _safe_float(ac.get("funding_usdt")), "trading_usdt": _safe_float(ac.get("trading_usdt")), } ) return out def build_fund_overview( exchanges: list[dict], *, board_rows: Optional[list[dict]] = None, trading_day: Optional[str] = None, keep_days: int = FUND_HISTORY_DAYS, reset_hour: int = 8, updated_at: Optional[str] = None, ) -> dict[str, Any]: day = (trading_day or "").strip()[:10] or current_trading_day(reset_hour=reset_hour) history = get_fund_history(anchor_day=day, keep_days=keep_days) rows_by_key: dict[str, dict] = {} for row in board_rows or []: if isinstance(row, dict): k = str(row.get("key") or "").strip() if k: rows_by_key[k] = row monitored_keys: list[str] = [] accounts_out: list[dict[str, Any]] = [] live_total = 0.0 live_known = 0 for ex in exchanges or []: if not _exchange_monitored(ex): continue key = str(ex.get("key") or "").strip() monitored = True row = _live_row_for_exchange(ex, rows_by_key) fu = tu = total = None data_ok = False if row and row.get("account_ok"): fu = _safe_float(row.get("funding_usdt")) tu = _safe_float(row.get("trading_usdt")) total = account_total_usdt(fu, tu) data_ok = total is not None if data_ok: live_total += total live_known += 1 series = _account_series(history, key) if key else [] dd = compute_drawdown([p["total_usdt"] for p in series]) if series else { "peak_usdt": None, "max_drawdown_u": None, "max_drawdown_pct": None, } day_delta = None if series: if len(series) >= 2: day_delta = round(series[-1]["total_usdt"] - series[-2]["total_usdt"], 4) elif data_ok and total is not None: day_delta = round(total - series[-1]["total_usdt"], 4) accounts_out.append( { "id": ex.get("id"), "key": key, "name": ex.get("name") or key, "monitored": monitored, "data_ok": data_ok, "funding_usdt": fu, "trading_usdt": tu, "total_usdt": total, "series": series, "drawdown": dd, "day_delta_usdt": day_delta, } ) if key: monitored_keys.append(key) total_series = _series_from_history(history, monitored_keys) if live_known > 0: last_day = total_series[-1]["day"] if total_series else None live_point = round(live_total, 4) if last_day == day and total_series: total_series[-1]["total_usdt"] = live_point total_series[-1]["live"] = True else: total_series.append({"day": day, "total_usdt": live_point, "live": True}) total_dd = compute_drawdown([p["total_usdt"] for p in total_series]) if total_series else { "peak_usdt": None, "max_drawdown_u": None, "max_drawdown_pct": None, } total_day_delta = None if total_series: if len(total_series) >= 2: total_day_delta = round( total_series[-1]["total_usdt"] - total_series[-2]["total_usdt"], 4 ) return { "ok": True, "trading_day": day, "reset_hour": reset_hour, "keep_days": keep_days, "history_start_day": fund_history_start_day(), "updated_at": updated_at, "totals": { "monitored_count": len(monitored_keys), "live_known_count": live_known, "total_usdt": round(live_total, 4) if live_known > 0 else None, "day_delta_usdt": total_day_delta, "series": total_series, "drawdown": total_dd, }, "accounts": accounts_out, } def format_fund_history_text( history: dict[str, dict], *, account_names: Optional[dict[str, str]] = None, ) -> str: if not history: return "(暂无资金历史快照)" names = account_names or {} lines = ["【资金快照(资金账户 + 交易账户 USDT)】"] for day in sorted(history.keys()): block = history.get(day) or {} ac_map = block.get("accounts") or {} if not ac_map: continue parts = [] for key, ac in ac_map.items(): label = names.get(key) or ac.get("name") or key fu = ac.get("funding_usdt") tu = ac.get("trading_usdt") tot = ac.get("total_usdt") if tot is None: tot = account_total_usdt(fu, tu) fu_txt = f"{fu}U" if fu is not None else "未知" tu_txt = f"{tu}U" if tu is not None else "未知" tot_txt = f"{tot}U" if tot is not None else "未知" parts.append(f"{label}: 合计{tot_txt}(资金{fu_txt}/交易{tu_txt})") lines.append(f"- {day}: " + ";".join(parts)) return "\n".join(lines) if len(lines) > 1 else "(暂无资金历史快照)"