"""平仓交易:交易所口径双边成交额与手续费(四所共用聚合逻辑)。""" from __future__ import annotations from typing import Any, Callable, Optional def _coerce_ts_ms(raw: Any) -> int | None: if raw in (None, ""): return None try: v = int(raw) return v if v > 1_000_000_000_000 else v * 1000 except (TypeError, ValueError): return None def quote_turnover_usdt_from_fill(trade: dict, *, contract_size: float = 1.0) -> float: """单笔成交的报价币成交额(USDT 口径)。""" info = trade.get("info") or {} if not isinstance(info, dict): info = {} for key in ("quoteQty", "quote_qty", "fillNotionalUsd", "notional"): try: v = float(info.get(key) or 0) if v > 0: return abs(v) except (TypeError, ValueError): continue try: cost = float(trade.get("cost") or 0) if cost > 0: return abs(cost) except (TypeError, ValueError): pass try: price = float(trade.get("price") or 0) amount = float(trade.get("amount") or 0) * float(contract_size or 1.0) if price > 0 and amount > 0: return abs(price * amount) except (TypeError, ValueError): pass return 0.0 def commission_usdt_from_fill(trade: dict) -> float: """单笔成交手续费(正数表示成本)。""" fee = trade.get("fee") if isinstance(fee, dict): try: cost = float(fee.get("cost") or 0) except (TypeError, ValueError): cost = 0.0 if cost != 0: cur = str(fee.get("currency") or "USDT").upper() if cur in ("USDT", "USD", "BUSD", "USDC"): return abs(cost) return abs(cost) info = trade.get("info") or {} if isinstance(info, dict): for key in ("fee", "commission", "fillFee"): try: v = float(info.get(key) or 0) if v != 0: return abs(v) except (TypeError, ValueError): continue return 0.0 def aggregate_bilateral_stats( fills: list[dict], *, contract_size: float = 1.0, ) -> dict[str, float] | None: """双边成交额 = 开+平所有相关 fill 的报价币成交额之和;手续费 = fill fee 之和。""" if not fills: return None turnover = 0.0 commission = 0.0 for t in fills: turnover += quote_turnover_usdt_from_fill(t, contract_size=contract_size) commission += commission_usdt_from_fill(t) if turnover <= 0 and commission <= 0: return None return { "exchange_turnover_usdt": round(turnover, 4), "exchange_commission_usdt": round(commission, 4), } def filter_position_lifecycle_fills( trades: list[dict], direction: str, open_ms: int | None, close_ms: int | None, *, hedge_mode: bool = False, close_buffer_ms: int = 15 * 60 * 1000, ) -> list[dict]: """ 持仓生命周期内 fill:多=开买+平卖;空=开卖+平买。 hedge_mode 时按 posSide 与 direction 过滤。 """ direction = (direction or "long").strip().lower() open_side = "buy" if direction == "long" else "sell" close_side = "sell" if direction == "long" else "buy" allowed_sides = {open_side, close_side} upper = int(close_ms) + int(close_buffer_ms) if close_ms else None out: list[dict] = [] for t in trades or []: side = (t.get("side") or "").lower() if side not in allowed_sides: continue ts = _coerce_ts_ms(t.get("timestamp")) if ts is None: continue if open_ms and ts < int(open_ms) - 60_000: continue if upper and ts > upper: continue if hedge_mode: info = t.get("info") or {} if not isinstance(info, dict): info = {} pos_side = (info.get("posSide") or t.get("posSide") or "").lower() if pos_side in ("long", "short") and pos_side != direction: continue out.append(t) out.sort(key=lambda x: x.get("timestamp") or 0) return out def sum_binance_commission_income(entries: list[dict], trade_ids: set[str] | None) -> float | None: """Binance income 流水中 COMMISSION 合计(负值取绝对值为成本)。""" if not entries: return None total = 0.0 found = False for e in entries: it = (e.get("incomeType") or e.get("income_type") or "").strip() if it != "COMMISSION": continue if trade_ids: tid = str(e.get("tradeId") or e.get("trade_id") or "").strip() if tid and tid not in trade_ids: continue try: total += float(e.get("income") or 0) found = True except (TypeError, ValueError): continue if not found: return None return round(abs(total), 4) def trade_ids_from_fills(fills: list[dict]) -> set[str]: out: set[str] = set() for t in fills or []: info = t.get("info") or {} if not isinstance(info, dict): info = {} for key in ("id", "tradeId", "trade_id"): raw = t.get(key) if key in t else info.get(key) if raw is not None and str(raw).strip(): out.add(str(raw).strip()) break return out def merge_commission_prefer_income( fill_commission: float, income_commission: float | None, ) -> float: if income_commission is not None and income_commission > 0: return round(income_commission, 4) return round(max(fill_commission, 0.0), 4) def update_trade_record_stats_columns( conn: Any, trade_id: int, turnover_usdt: float | None, commission_usdt: float | None, ) -> None: if turnover_usdt is None and commission_usdt is None: return conn.execute( """ UPDATE trade_records SET exchange_turnover_usdt = COALESCE(?, exchange_turnover_usdt), exchange_commission_usdt = COALESCE(?, exchange_commission_usdt) WHERE id = ? """, (turnover_usdt, commission_usdt, int(trade_id)), ) def attach_exchange_stats_to_trade( conn: Any, trade_id: int, *, fetch_fills: Callable[[], list[dict]], contract_size: float = 1.0, income_commission: float | None = None, ) -> dict[str, float] | None: """拉 fill 并写库;仅在新单平仓路径调用。""" try: fills = fetch_fills() or [] except Exception: fills = [] stats = aggregate_bilateral_stats(fills, contract_size=contract_size) if not stats and income_commission is None: return None turnover = stats.get("exchange_turnover_usdt") if stats else None fill_comm = float(stats.get("exchange_commission_usdt") or 0) if stats else 0.0 commission = merge_commission_prefer_income(fill_comm, income_commission) update_trade_record_stats_columns( conn, trade_id, turnover, commission if commission > 0 else None, ) out = {} if turnover is not None: out["exchange_turnover_usdt"] = turnover if commission > 0: out["exchange_commission_usdt"] = commission return out or None