feat: 内照明心交易日历与交易所口径成交额/手续费统计

新增按 08:00 切日的月历(盈亏、笔数、犯病日高亮与点击筛选);平仓时从交易所 fill 写入双边成交额与手续费,统计表与明细同步展示。

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-30 08:05:46 +08:00
parent 865567fbd3
commit 6b872b1f43
13 changed files with 1113 additions and 12 deletions
+229
View File
@@ -0,0 +1,229 @@
"""平仓交易:交易所口径双边成交额与手续费(四所共用聚合逻辑)。"""
from __future__ import annotations
from typing import Any, Callable, Optional
def _coerce_ts_ms(raw: Any) -> int | None:
if raw in (None, ""):
return None
try:
v = int(raw)
return v if v > 1_000_000_000_000 else v * 1000
except (TypeError, ValueError):
return None
def quote_turnover_usdt_from_fill(trade: dict, *, contract_size: float = 1.0) -> float:
"""单笔成交的报价币成交额(USDT 口径)。"""
info = trade.get("info") or {}
if not isinstance(info, dict):
info = {}
for key in ("quoteQty", "quote_qty", "fillNotionalUsd", "notional"):
try:
v = float(info.get(key) or 0)
if v > 0:
return abs(v)
except (TypeError, ValueError):
continue
try:
cost = float(trade.get("cost") or 0)
if cost > 0:
return abs(cost)
except (TypeError, ValueError):
pass
try:
price = float(trade.get("price") or 0)
amount = float(trade.get("amount") or 0) * float(contract_size or 1.0)
if price > 0 and amount > 0:
return abs(price * amount)
except (TypeError, ValueError):
pass
return 0.0
def commission_usdt_from_fill(trade: dict) -> float:
"""单笔成交手续费(正数表示成本)。"""
fee = trade.get("fee")
if isinstance(fee, dict):
try:
cost = float(fee.get("cost") or 0)
except (TypeError, ValueError):
cost = 0.0
if cost != 0:
cur = str(fee.get("currency") or "USDT").upper()
if cur in ("USDT", "USD", "BUSD", "USDC"):
return abs(cost)
return abs(cost)
info = trade.get("info") or {}
if isinstance(info, dict):
for key in ("fee", "commission", "fillFee"):
try:
v = float(info.get(key) or 0)
if v != 0:
return abs(v)
except (TypeError, ValueError):
continue
return 0.0
def aggregate_bilateral_stats(
fills: list[dict],
*,
contract_size: float = 1.0,
) -> dict[str, float] | None:
"""双边成交额 = 开+平所有相关 fill 的报价币成交额之和;手续费 = fill fee 之和。"""
if not fills:
return None
turnover = 0.0
commission = 0.0
for t in fills:
turnover += quote_turnover_usdt_from_fill(t, contract_size=contract_size)
commission += commission_usdt_from_fill(t)
if turnover <= 0 and commission <= 0:
return None
return {
"exchange_turnover_usdt": round(turnover, 4),
"exchange_commission_usdt": round(commission, 4),
}
def filter_position_lifecycle_fills(
trades: list[dict],
direction: str,
open_ms: int | None,
close_ms: int | None,
*,
hedge_mode: bool = False,
close_buffer_ms: int = 15 * 60 * 1000,
) -> list[dict]:
"""
持仓生命周期内 fill:多=开买+平卖;空=开卖+平买。
hedge_mode 时按 posSide 与 direction 过滤。
"""
direction = (direction or "long").strip().lower()
open_side = "buy" if direction == "long" else "sell"
close_side = "sell" if direction == "long" else "buy"
allowed_sides = {open_side, close_side}
upper = int(close_ms) + int(close_buffer_ms) if close_ms else None
out: list[dict] = []
for t in trades or []:
side = (t.get("side") or "").lower()
if side not in allowed_sides:
continue
ts = _coerce_ts_ms(t.get("timestamp"))
if ts is None:
continue
if open_ms and ts < int(open_ms) - 60_000:
continue
if upper and ts > upper:
continue
if hedge_mode:
info = t.get("info") or {}
if not isinstance(info, dict):
info = {}
pos_side = (info.get("posSide") or t.get("posSide") or "").lower()
if pos_side in ("long", "short") and pos_side != direction:
continue
out.append(t)
out.sort(key=lambda x: x.get("timestamp") or 0)
return out
def sum_binance_commission_income(entries: list[dict], trade_ids: set[str] | None) -> float | None:
"""Binance income 流水中 COMMISSION 合计(负值取绝对值为成本)。"""
if not entries:
return None
total = 0.0
found = False
for e in entries:
it = (e.get("incomeType") or e.get("income_type") or "").strip()
if it != "COMMISSION":
continue
if trade_ids:
tid = str(e.get("tradeId") or e.get("trade_id") or "").strip()
if tid and tid not in trade_ids:
continue
try:
total += float(e.get("income") or 0)
found = True
except (TypeError, ValueError):
continue
if not found:
return None
return round(abs(total), 4)
def trade_ids_from_fills(fills: list[dict]) -> set[str]:
out: set[str] = set()
for t in fills or []:
info = t.get("info") or {}
if not isinstance(info, dict):
info = {}
for key in ("id", "tradeId", "trade_id"):
raw = t.get(key) if key in t else info.get(key)
if raw is not None and str(raw).strip():
out.add(str(raw).strip())
break
return out
def merge_commission_prefer_income(
fill_commission: float,
income_commission: float | None,
) -> float:
if income_commission is not None and income_commission > 0:
return round(income_commission, 4)
return round(max(fill_commission, 0.0), 4)
def update_trade_record_stats_columns(
conn: Any,
trade_id: int,
turnover_usdt: float | None,
commission_usdt: float | None,
) -> None:
if turnover_usdt is None and commission_usdt is None:
return
conn.execute(
"""
UPDATE trade_records
SET exchange_turnover_usdt = COALESCE(?, exchange_turnover_usdt),
exchange_commission_usdt = COALESCE(?, exchange_commission_usdt)
WHERE id = ?
""",
(turnover_usdt, commission_usdt, int(trade_id)),
)
def attach_exchange_stats_to_trade(
conn: Any,
trade_id: int,
*,
fetch_fills: Callable[[], list[dict]],
contract_size: float = 1.0,
income_commission: float | None = None,
) -> dict[str, float] | None:
"""拉 fill 并写库;仅在新单平仓路径调用。"""
try:
fills = fetch_fills() or []
except Exception:
fills = []
stats = aggregate_bilateral_stats(fills, contract_size=contract_size)
if not stats and income_commission is None:
return None
turnover = stats.get("exchange_turnover_usdt") if stats else None
fill_comm = float(stats.get("exchange_commission_usdt") or 0) if stats else 0.0
commission = merge_commission_prefer_income(fill_comm, income_commission)
update_trade_record_stats_columns(
conn,
trade_id,
turnover,
commission if commission > 0 else None,
)
out = {}
if turnover is not None:
out["exchange_turnover_usdt"] = turnover
if commission > 0:
out["exchange_commission_usdt"] = commission
return out or None