diff --git a/crypto_monitor_binance/app.py b/crypto_monitor_binance/app.py index afb73b4..84982e7 100644 --- a/crypto_monitor_binance/app.py +++ b/crypto_monitor_binance/app.py @@ -180,7 +180,7 @@ BINANCE_PNL_INCLUDE_FUNDING = os.getenv("BINANCE_PNL_INCLUDE_FUNDING", "false"). # 与币安 App 仓位历史对齐目标误差(USDT) BINANCE_PNL_MATCH_TOLERANCE = max(0.01, float(os.getenv("BINANCE_PNL_MATCH_TOLERANCE", "0.05"))) _LAST_EXCHANGE_PNL_SYNC_AT = 0.0 -_BINANCE_CLOSED_POS_CACHE = {"at": 0.0, "hist": []} +_BINANCE_CLOSED_POS_CACHE = {"at": 0.0, "hist": [], "trade_counts": {}, "last_err": {}} KEY_MONITOR_ALERT_ONLY_TYPES = frozenset({"关键阻力位", "关键支撑位"}) AUTO_TRANSFER_ENABLED = os.getenv("AUTO_TRANSFER_ENABLED", "false").lower() == "true" AUTO_TRANSFER_AMOUNT = float(os.getenv("AUTO_TRANSFER_AMOUNT", "30")) @@ -5484,13 +5484,34 @@ def api_sync_exchange_pnl(): matched = 0 for r in rows: tr = conn.execute( - "SELECT exchange_sync_key FROM trade_records WHERE id=?", (int(r["id"]),) + "SELECT exchange_realized_pnl, exchange_sync_key FROM trade_records WHERE id=?", + (int(r["id"]),), ).fetchone() - if tr and str(tr["exchange_sync_key"] or "").startswith("pos|"): + if tr and tr["exchange_realized_pnl"] is not None: matched += 1 conn.commit() conn.close() - return jsonify({"ok": True, "synced": matched, "candidates": len(rows), "positions": len(_BINANCE_CLOSED_POS_CACHE.get("hist") or [])}) + tc = _BINANCE_CLOSED_POS_CACHE.get("trade_counts") or {} + trade_n = sum(int(v) for v in tc.values()) + pos_n = len(_BINANCE_CLOSED_POS_CACHE.get("hist") or []) + err = _BINANCE_CLOSED_POS_CACHE.get("last_err") or {} + msg = f"成交 {trade_n} 笔,重建仓位 {pos_n} 条,已写入 {matched}/{len(rows)} 条记录" + if trade_n == 0: + msg += "(未拉到成交,请检查 API 权限/代理/EXCHANGE_POSITION_SYNC_FROM_BJ)" + if pos_n == 0 and trade_n > 0: + msg += "(有成交但未识别出完整平仓,请核对持仓模式 BINANCE_POSITION_MODE)" + if err: + msg += f" 错误: {err}" + return jsonify( + { + "ok": True, + "synced": matched, + "candidates": len(rows), + "positions": pos_n, + "trade_counts": tc, + "msg": msg, + } + ) def _coerce_ts_ms(val): @@ -5522,17 +5543,28 @@ def _unified_symbol_for_match(symbol_str): def _fetch_my_trades_paginated(exchange_symbol, since_ms, until_ms=None, max_pages=40): """分页拉取成交(Binance userTrades)。""" + global _BINANCE_CLOSED_POS_CACHE if not (BINANCE_API_KEY and BINANCE_API_SECRET): + _BINANCE_CLOSED_POS_CACHE["last_err"][exchange_symbol] = "未配置 API Key" return [] ensure_markets_loaded() out = [] since = int(since_ms) if since_ms else None until_ms = int(until_ms) if until_ms else None - for _ in range(max_pages): + last_err = None + for page in range(max_pages): try: batch = exchange.fetch_my_trades(exchange_symbol, since=since, limit=1000) - except Exception: - break + except Exception as e: + last_err = str(e) + if page == 0 and since: + try: + batch = exchange.fetch_my_trades(exchange_symbol, limit=1000) + except Exception as e2: + last_err = str(e2) + break + else: + break if not batch: break last_ts = None @@ -5540,8 +5572,6 @@ def _fetch_my_trades_paginated(exchange_symbol, since_ms, until_ms=None, max_pag ts = _coerce_ts_ms(t.get("timestamp")) if until_ms and ts and ts > until_ms: continue - if since and ts and ts < since: - continue out.append(t) if ts: last_ts = ts @@ -5550,6 +5580,8 @@ def _fetch_my_trades_paginated(exchange_symbol, since_ms, until_ms=None, max_pag if last_ts is None: break since = last_ts + 1 + if last_err: + _BINANCE_CLOSED_POS_CACHE["last_err"][exchange_symbol] = last_err return out @@ -5580,8 +5612,11 @@ def fetch_binance_closed_positions_history(symbols=None, force_refresh=False): since_ms = exchange_position_sync_since_ms() until_ms = int(time.time() * 1000) + 120_000 trades_by_symbol = {} + trade_counts = {} for ex_sym in sym_list: - trades_by_symbol[ex_sym] = _fetch_my_trades_paginated(ex_sym, since_ms, until_ms) + trades = _fetch_my_trades_paginated(ex_sym, since_ms, until_ms) + trades_by_symbol[ex_sym] = trades + trade_counts[ex_sym] = len(trades) def _contract_size(ex_sym): try: @@ -5595,6 +5630,7 @@ def fetch_binance_closed_positions_history(symbols=None, force_refresh=False): unified_symbol_fn=_unified_symbol_for_match, position_mode=BINANCE_POSITION_MODE, contract_size_fn=_contract_size, + since_ms=since_ms, ) prev = list(_BINANCE_CLOSED_POS_CACHE["hist"] or []) if prev and not force_refresh: @@ -5606,6 +5642,7 @@ def fetch_binance_closed_positions_history(symbols=None, force_refresh=False): hist = sorted(prev, key=lambda x: int(x.get("close_ms") or 0), reverse=True) _BINANCE_CLOSED_POS_CACHE["at"] = now _BINANCE_CLOSED_POS_CACHE["hist"] = hist + _BINANCE_CLOSED_POS_CACHE["trade_counts"] = trade_counts return hist @@ -5851,10 +5888,11 @@ def sync_trade_records_from_exchange(conn, force=False): except Exception: hist = [] used = set() - for tr in candidates: - sk0 = (tr["exchange_sync_key"] if "exchange_sync_key" in tr.keys() else None) or "" - if sk0 and str(sk0).startswith("pos|"): - used.add(str(sk0).strip()) + if not force: + for tr in candidates: + sk0 = (tr["exchange_sync_key"] if "exchange_sync_key" in tr.keys() else None) or "" + if sk0 and str(sk0).startswith("pos|"): + used.add(str(sk0).strip()) matched = 0 for tr in candidates: pos, _ = match_trade_record_to_position( diff --git a/crypto_monitor_binance/binance_closed_positions_lib.py b/crypto_monitor_binance/binance_closed_positions_lib.py index 0e6c9a7..0cfaff8 100644 --- a/crypto_monitor_binance/binance_closed_positions_lib.py +++ b/crypto_monitor_binance/binance_closed_positions_lib.py @@ -95,6 +95,7 @@ def rebuild_closed_positions_for_leg( position_mode="hedge", contract_size=1.0, qty_eps=1e-9, + since_ms=None, ): """从按时间排序的成交重建某一方向的已平仓位列表。""" legs = [t for t in trades if _trade_belongs_to_direction(t, direction, position_mode)] @@ -102,6 +103,9 @@ def rebuild_closed_positions_for_leg( closed = [] qty = 0.0 open_ms = None + since_anchor_ms = int(since_ms) if since_ms else None + if since_anchor_ms is None and legs: + since_anchor_ms = int(legs[0].get("timestamp") or 0) or None pnl_accum = 0.0 close_ms = None open_cost = 0.0 @@ -170,7 +174,14 @@ def rebuild_closed_positions_for_leg( cycle_ids.append(tid) continue - if delta < 0 and qty > qty_eps: + if delta < 0: + if qty <= qty_eps: + # 开仓早于拉取窗口:从首笔减仓起视为一段已平仓位 + qty = abs(delta) + open_ms = open_ms or since_anchor_ms or ts + pnl_accum = 0.0 + open_cost = open_qty = close_cost = close_qty = 0.0 + cycle_ids = [] reduce = min(qty, abs(delta)) qty -= reduce pnl_accum += trade_pnl_contribution(t) @@ -192,6 +203,7 @@ def rebuild_closed_positions_from_trades( unified_symbol_fn, position_mode="hedge", contract_size_fn=None, + since_ms=None, ): """ trades_by_symbol: {exchange_symbol: [ccxt trade dict, ...]} @@ -216,6 +228,7 @@ def rebuild_closed_positions_from_trades( trades, position_mode=position_mode, contract_size=cs, + since_ms=since_ms, ) ) out.sort(key=lambda x: int(x.get("close_ms") or 0), reverse=True) @@ -229,7 +242,7 @@ def match_trade_record_to_position( *, unified_symbol_fn, to_ms_fn, - max_close_delta_ms=90 * 60 * 1000, + max_close_delta_ms=120 * 60 * 1000, max_open_before_ms=15 * 60 * 1000, max_open_after_ms=15 * 86400 * 1000, ): diff --git a/crypto_monitor_binance/templates/index.html b/crypto_monitor_binance/templates/index.html index 9989d32..f512bda 100644 --- a/crypto_monitor_binance/templates/index.html +++ b/crypto_monitor_binance/templates/index.html @@ -1009,7 +1009,7 @@ function syncExchangePnl(force){ body: JSON.stringify({limit: 200, force: !!force}) }).then(r=>r.json()).then(data=>{ if(data.ok){ - alert("已同步 " + (data.synced||0) + " / " + (data.candidates||0) + " 条记录,页面将刷新"); + alert(data.msg || ("已同步 " + (data.synced||0) + " / " + (data.candidates||0) + " 条记录,页面将刷新")); window.location.reload(); } else { alert(data.msg || "同步失败");