From c365a89c470147496bbc27652da8171cdec559bf Mon Sep 17 00:00:00 2001 From: dekun Date: Tue, 19 May 2026 18:04:33 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=B8=81=E5=AE=89=E4=BA=A4?= =?UTF-8?q?=E6=98=93=E8=AE=B0=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crypto_monitor_binance/.env.example | 2 +- crypto_monitor_binance/app.py | 249 +++++++++++++++------------- 2 files changed, 139 insertions(+), 112 deletions(-) diff --git a/crypto_monitor_binance/.env.example b/crypto_monitor_binance/.env.example index bf410f6..61f1829 100644 --- a/crypto_monitor_binance/.env.example +++ b/crypto_monitor_binance/.env.example @@ -75,7 +75,7 @@ BINANCE_TRIGGER_WORKING_TYPE=CONTRACT_PRICE # EXCHANGE_DISPLAY_NAME=Binance # 企业微信推送里展示的账户备注 # BINANCE_ACCOUNT_LABEL=binance实盘账户 -# 盈亏同步口径:false=与 App「仓位历史-实现盈亏」一致(不含资金费);true=含资金费等完整流水 +# 盈亏同步:false=按仓位历史口径(已实现盈亏+手续费,不含资金费);true=含资金费 # BINANCE_PNL_INCLUDE_FUNDING=false # ============================================================================= diff --git a/crypto_monitor_binance/app.py b/crypto_monitor_binance/app.py index 85210be..11ac5b6 100644 --- a/crypto_monitor_binance/app.py +++ b/crypto_monitor_binance/app.py @@ -162,8 +162,9 @@ ORDER_MONITOR_TYPE_KEY_AUTO = "关键位监控" KEY_MONITOR_AUTO_TYPES = frozenset({"箱体突破", "收敛突破"}) EXCHANGE_POSITION_SYNC_FROM_BJ = (os.getenv("EXCHANGE_POSITION_SYNC_FROM_BJ") or "").strip() EXCHANGE_POSITION_HISTORY_LIMIT = max(50, min(1000, int(os.getenv("EXCHANGE_POSITION_HISTORY_LIMIT", "200")))) -# 与币安 App「仓位历史-实现盈亏」对齐:仅已实现盈亏 + 手续费(不含资金费) -BINANCE_APP_PNL_INCOME_TYPES = frozenset({"REALIZED_PNL", "COMMISSION"}) +# 与币安 App「仓位历史-实现盈亏」对齐:默认仅 REALIZED_PNL(手续费另计;避免与 COMMISSION 重复扣) +BINANCE_APP_PNL_INCOME_TYPES = frozenset({"REALIZED_PNL"}) +BINANCE_APP_PNL_INCOME_WITH_FEE = frozenset({"REALIZED_PNL", "COMMISSION"}) BINANCE_NET_INCOME_TYPES = frozenset( {"REALIZED_PNL", "COMMISSION", "FUNDING_FEE", "INSURANCE_CLEAR", "INTERNAL_AUTO_CLOSE"} ) @@ -2068,19 +2069,47 @@ def get_plan_notional_usdt(row_or_dict): def _trade_ids_from_fills(trades): + """仅使用 Binance 原始 tradeId(与 income 流水一致),不用 ccxt 的 id。""" ids = set() for t in trades or []: info = t.get("info") if isinstance(t.get("info"), dict) else {} - for src in (t, info): - if not isinstance(src, dict): - continue - for k in ("tradeId", "trade_id", "id"): - v = src.get(k) - if v is not None and str(v).strip() != "": - ids.add(str(v).strip()) + for k in ("tradeId", "trade_id"): + v = info.get(k) + if v is not None and str(v).strip() != "": + ids.add(str(v).strip()) return ids +def _cluster_closing_trades_near_close(trades, closed_ms, spread_ms=8 * 60 * 1000): + """只保留平仓时刻附近的一簇减仓成交,避免把相邻其它仓位算进来。""" + if not trades: + return [] + if closed_ms is None: + return list(trades) + try: + closed_ms = int(closed_ms) + except (TypeError, ValueError): + return list(trades) + scored = [] + for t in trades: + ts = _coerce_ts_ms(t.get("timestamp")) + if ts is None: + continue + scored.append((abs(ts - closed_ms), t)) + if not scored: + return list(trades) + scored.sort(key=lambda x: x[0]) + anchor_ts = _coerce_ts_ms(scored[0][1].get("timestamp")) + if anchor_ts is None: + return [scored[0][1]] + return [ + t + for t in trades + if _coerce_ts_ms(t.get("timestamp")) is not None + and abs(_coerce_ts_ms(t.get("timestamp")) - anchor_ts) <= spread_ms + ] + + def _income_entry_trade_id(entry): if not isinstance(entry, dict): return "" @@ -2094,47 +2123,61 @@ def _income_entry_trade_id(entry): def calc_binance_realized_pnl_from_trades(trades): - """ - 按 Binance 成交回报汇总:sum(realizedPnl) + 手续费(与 App「仓位历史-实现盈亏」一致)。 - """ + """仅汇总成交回报中的 realizedPnl(勿再扣 commission,避免与 income 重复)。""" if not trades: return None total = 0.0 has = False for t in trades: info = t.get("info") if isinstance(t.get("info"), dict) else {} - for src in (info, t): - if not isinstance(src, dict): - continue - for key in ("realizedPnl", "realized_pnl"): - v = src.get(key) - if v is None or str(v).strip() == "": - continue - try: - total += float(v) - has = True - except (TypeError, ValueError): - pass - fee = t.get("fee") - if isinstance(fee, dict) and fee.get("cost") is not None: - try: - total += float(fee["cost"]) - has = True - except (TypeError, ValueError): - pass - comm = info.get("commission") - if comm is not None and str(comm).strip() != "": - try: - c = float(comm) - total -= c if c > 0 else -abs(c) - has = True - except (TypeError, ValueError): - pass + v = info.get("realizedPnl") + if v is None or str(v).strip() == "": + v = t.get("realizedPnl") or t.get("realized_pnl") + if v is None or str(v).strip() == "": + continue + try: + total += float(v) + has = True + except (TypeError, ValueError): + pass if not has: return None return round(total, FUNDS_DECIMALS) +def _sum_binance_income(entries, income_types, trade_ids=None): + net = 0.0 + first_t = None + last_t = None + strict = bool(trade_ids) + for e in entries: + it = (e.get("incomeType") or e.get("income_type") or "").strip() + if it not in income_types: + continue + if strict: + if it in ("REALIZED_PNL", "COMMISSION"): + tid = _income_entry_trade_id(e) + if not tid or tid not in trade_ids: + continue + else: + continue + elif trade_ids and it in ("REALIZED_PNL", "COMMISSION"): + tid = _income_entry_trade_id(e) + if tid and tid not in trade_ids: + continue + try: + net += float(e.get("income") or 0) + except (TypeError, ValueError): + pass + t = _coerce_ts_ms(e.get("time")) + if t: + first_t = t if first_t is None else min(first_t, t) + last_t = t if last_t is None else max(last_t, t) + if first_t is None: + return None, None, None + return round(net, FUNDS_DECIMALS), first_t, last_t + + def calc_pnl_from_closing_trades(direction, entry_price, trades, exchange_symbol=None): """按减仓成交数量×价差汇总盈亏(不含资金费;比单点标记价更接近交易所)。""" try: @@ -2208,6 +2251,8 @@ def resolve_trade_pnl_amount( opened_at_ms=open_ms, closed_at_ms=close_ms, ) + if closing_trades and close_ms: + closing_trades = _cluster_closing_trades_near_close(closing_trades, int(close_ms)) if closing_trades: wexit = calc_weighted_exit_price(closing_trades) if wexit and (exit_price is None or float(exit_price or 0) <= 0): @@ -3613,9 +3658,7 @@ def fetch_closing_fills_for_record(exchange_symbol, direction, opened_at_str, cl since_ms = _to_ms_with_fallback(opened_at_ms, opened_at_str) close_side = "sell" if direction == "long" else "buy" closed_ms = _to_ms_with_fallback(closed_at_ms, closed_at_str) if (closed_at_str or closed_at_ms is not None) else None - # 历史记录回填给一点缓冲,兼容成交落在记录时间附近的情况 - if closed_ms is not None: - closed_ms += 6 * 60 * 60 * 1000 + close_upper_ms = (int(closed_ms) + 15 * 60 * 1000) if closed_ms is not None else None candidates = [] all_side_candidates = [] try: @@ -3639,7 +3682,7 @@ def fetch_closing_fills_for_record(exchange_symbol, direction, opened_at_str, cl continue if since_ms and ts < since_ms: continue - if closed_ms and ts > closed_ms: + if close_upper_ms and ts > close_upper_ms: continue info = t.get("info") or {} if not isinstance(info, dict): @@ -3649,10 +3692,6 @@ def fetch_closing_fills_for_record(exchange_symbol, direction, opened_at_str, cl if pos_side in ("long", "short") and pos_side != direction: continue all_side_candidates.append(t) - if since_ms and ts < since_ms: - continue - if closed_ms and ts > closed_ms: - continue candidates.append(t) candidates.sort(key=lambda x: x.get("timestamp") or 0) if candidates: @@ -3663,25 +3702,21 @@ def fetch_closing_fills_for_record(exchange_symbol, direction, opened_at_str, cl if not all_side_candidates: return [] if not closed_ms: - return all_side_candidates[-20:] + return all_side_candidates[-5:] near = [] for t in all_side_candidates: - ts = t.get("timestamp") + ts = _coerce_ts_ms(t.get("timestamp")) if ts is None: continue - try: - delta = abs(int(ts) - int(closed_ms)) - except Exception: - continue - # 放宽到前后 7 天 - if delta <= 7 * 24 * 60 * 60 * 1000: + delta = abs(ts - int(closed_ms)) + if delta <= 45 * 60 * 1000: near.append((delta, t)) if near: near.sort(key=lambda x: x[0]) - picked = [x[1] for x in near[:20]] + picked = [x[1] for x in near[:12]] picked.sort(key=lambda x: x.get("timestamp") or 0) - return picked - return all_side_candidates[-20:] + return _cluster_closing_trades_near_close(picked, int(closed_ms)) + return _cluster_closing_trades_near_close(all_side_candidates[-5:], int(closed_ms)) def calc_weighted_exit_price(trades): @@ -5482,66 +5517,58 @@ def fetch_binance_net_pnl_for_trade( ): if open_ms is None or close_ms is None or close_ms < open_ms: return None, None, None, None - trade_ids = _trade_ids_from_fills(closing_trades) if closing_trades else None if closing_trades: - trade_pnl = calc_binance_realized_pnl_from_trades(closing_trades) - if trade_pnl is not None: - first_t = None - last_t = None - for t in closing_trades: - ts = _coerce_ts_ms(t.get("timestamp")) - if ts: - first_t = ts if first_t is None else min(first_t, ts) - last_t = ts if last_t is None else max(last_t, ts) - ensure_markets_loaded() - market = exchange.market(exchange_symbol) - cid = market.get("id") or exchange_symbol - sync_key = f"trades|{cid}|{direction}|{open_ms}|{close_ms}|{trade_pnl}" - eo = ms_to_app_local_str(first_t) if first_t else None - ec = ms_to_app_local_str(last_t) if last_t else None - return trade_pnl, sync_key, eo, ec - income_types = ( - BINANCE_NET_INCOME_TYPES - if BINANCE_PNL_INCLUDE_FUNDING - else BINANCE_APP_PNL_INCOME_TYPES - ) - buffer_ms = 90 * 1000 if trade_ids else 5 * 60 * 1000 + closing_trades = _cluster_closing_trades_near_close(closing_trades, int(close_ms)) + trade_ids = _trade_ids_from_fills(closing_trades) if closing_trades else None + buffer_ms = 3 * 60 * 1000 if trade_ids else 5 * 60 * 1000 entries = _fetch_binance_income_entries( exchange_symbol, max(0, int(open_ms) - buffer_ms), int(close_ms) + buffer_ms ) - if not entries: - return None, None, None, None - net = 0.0 - first_t = None - last_t = None - for e in entries: - it = (e.get("incomeType") or e.get("income_type") or "").strip() - if it not in income_types: - continue - if trade_ids and it in ("REALIZED_PNL", "COMMISSION"): - tid = _income_entry_trade_id(e) - if tid and tid not in trade_ids: - continue - if trade_ids and it == "FUNDING_FEE": - continue - try: - net += float(e.get("income") or 0) - except (TypeError, ValueError): - pass - t = _coerce_ts_ms(e.get("time")) - if t: - first_t = t if first_t is None else min(first_t, t) - last_t = t if last_t is None else max(last_t, t) - if first_t is None: - return None, None, None, None - net = round(net, FUNDS_DECIMALS) ensure_markets_loaded() market = exchange.market(exchange_symbol) cid = market.get("id") or exchange_symbol - sync_key = f"income|{cid}|{direction}|{open_ms}|{close_ms}|{net}" - eo = ms_to_app_local_str(first_t) - ec = ms_to_app_local_str(last_t) - return net, sync_key, eo, ec + + def _pack(net, first_t, last_t, prefix): + if net is None: + return None + sk = f"{prefix}|{cid}|{direction}|{open_ms}|{close_ms}|{net}" + eo = ms_to_app_local_str(first_t) if first_t else None + ec = ms_to_app_local_str(last_t) if last_t else None + return net, sk, eo, ec + + if entries and trade_ids: + net, ft, lt = _sum_binance_income(entries, BINANCE_APP_PNL_INCOME_WITH_FEE, trade_ids) + out = _pack(net, ft, lt, "income_net") + if out: + return out + net, ft, lt = _sum_binance_income(entries, BINANCE_APP_PNL_INCOME_TYPES, trade_ids) + out = _pack(net, ft, lt, "income_rp") + if out: + return out + + if closing_trades: + trade_pnl = calc_binance_realized_pnl_from_trades(closing_trades) + if trade_pnl is not None: + fts = [_coerce_ts_ms(t.get("timestamp")) for t in closing_trades] + fts = [x for x in fts if x] + ft = min(fts) if fts else None + lt = max(fts) if fts else None + out = _pack(trade_pnl, ft, lt, "trades_rp") + if out: + return out + + if entries: + loose_types = ( + BINANCE_NET_INCOME_TYPES + if BINANCE_PNL_INCLUDE_FUNDING + else BINANCE_APP_PNL_INCOME_WITH_FEE + ) + net, ft, lt = _sum_binance_income(entries, loose_types, trade_ids if trade_ids else None) + out = _pack(net, ft, lt, "income") + if out: + return out + + return None, None, None, None def sync_trade_record_exchange_pnl(conn, record_id, commit=True, force=False):