From 2965744c454305be62cc58b00d11aae56b701368 Mon Sep 17 00:00:00 2001 From: dekun Date: Thu, 28 May 2026 12:02:03 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=94=AF=E6=92=91=E9=98=BB?= =?UTF-8?q?=E5=8A=9B=E7=9A=84=E4=BC=81=E4=B8=9A=E5=BE=AE=E4=BF=A1=E6=8E=A8?= =?UTF-8?q?=E9=80=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crypto_monitor_gate/.env.example | 4 ++ crypto_monitor_gate/app.py | 73 ++++++++++++++++++++++++-------- 2 files changed, 59 insertions(+), 18 deletions(-) diff --git a/crypto_monitor_gate/.env.example b/crypto_monitor_gate/.env.example index cf59823..ec71ea4 100644 --- a/crypto_monitor_gate/.env.example +++ b/crypto_monitor_gate/.env.example @@ -127,6 +127,10 @@ BALANCE_REFRESH_SECONDS=60 PRICE_REFRESH_SECONDS=5 # 后台监控轮询周期(秒) MONITOR_POLL_SECONDS=3 +# 重启后多少秒内不做「外部平仓」同步(避免 API 未就绪误判) +RECONCILE_STARTUP_GRACE_SEC=90 +# 连续多少次轮询确认交易所空仓后,才记为外部平仓(默认 3 次 ≈ 9 秒) +RECONCILE_FLAT_CONFIRM_POLLS=3 # 使用可用资金时的缓冲比例(如0.98代表用98%) FULL_MARGIN_BUFFER_RATIO=0.98 diff --git a/crypto_monitor_gate/app.py b/crypto_monitor_gate/app.py index 66c40fd..ec16788 100644 --- a/crypto_monitor_gate/app.py +++ b/crypto_monitor_gate/app.py @@ -227,7 +227,11 @@ AUTO_TRANSFER_BJ_HOUR = int(os.getenv("AUTO_TRANSFER_BJ_HOUR", "8")) WECHAT_TIMEOUT_SECONDS = int(os.getenv("WECHAT_TIMEOUT_SECONDS", "10")) AI_TIMEOUT_SECONDS = int(os.getenv("AI_TIMEOUT_SECONDS", "120")) MONITOR_POLL_SECONDS = int(os.getenv("MONITOR_POLL_SECONDS", "3")) +RECONCILE_STARTUP_GRACE_SEC = int(os.getenv("RECONCILE_STARTUP_GRACE_SEC", "90")) +RECONCILE_FLAT_CONFIRM_POLLS = max(1, int(os.getenv("RECONCILE_FLAT_CONFIRM_POLLS", "3"))) KLINE_TIMEFRAME = os.getenv("KLINE_TIMEFRAME", "5m") +_APP_STARTED_AT = time.time() +_RECONCILE_FLAT_STREAK = {} FULL_MARGIN_BUFFER_RATIO = float(os.getenv("FULL_MARGIN_BUFFER_RATIO", "0.98")) TRANSFER_CCY = os.getenv("TRANSFER_CCY", "USDT") UPLOAD_FOLDER = resolve_path(os.getenv("UPLOAD_DIR", "static/images")) @@ -3339,38 +3343,54 @@ def is_no_position_error(err_msg): return any(k in msg for k in keywords) -def get_live_position_contracts(exchange_symbol, direction): - ensure_markets_loaded() +def _gate_fetch_position_rows(exchange_symbol): + """优先拉 USDT 本位全量持仓(与页面一致),避免单合约查询在重启后返回空列表误判空仓。""" try: - rows = exchange.fetch_positions([exchange_symbol]) + ensure_markets_loaded() except Exception: return None + try: + return exchange.fetch_positions(None, {"settle": "usdt"}) or [] + except Exception: + pass + if not exchange_symbol: + return None + try: + return exchange.fetch_positions([exchange_symbol]) or [] + except Exception: + return None + + +def _sum_live_position_contracts(rows, exchange_symbol, direction, relax_direction=False): total = 0.0 + if not rows: + return total + direction = (direction or "long").strip().lower() for p in rows: if not _position_matches_wanted_contract(exchange_symbol, p): continue - info = p.get("info", {}) or {} - side = (p.get("side") or info.get("posSide") or "").lower() - contracts = p.get("contracts") - if contracts is None: - raw_pos = info.get("pos") or info.get("size") - try: - contracts = abs(float(raw_pos)) if raw_pos is not None else 0.0 - except Exception: - contracts = 0.0 - try: - contracts = float(contracts) - except Exception: - contracts = 0.0 + contracts = _position_row_effective_contracts(p) if contracts <= 0: continue - if GATE_POS_MODE == "hedge": + if (not relax_direction) and GATE_POS_MODE == "hedge": + info = p.get("info", {}) or {} + side = (p.get("side") or info.get("posSide") or "").lower() if side and side != direction: continue total += contracts return total +def get_live_position_contracts(exchange_symbol, direction): + rows = _gate_fetch_position_rows(exchange_symbol) + if rows is None: + return None + total = _sum_live_position_contracts(rows, exchange_symbol, direction, relax_direction=False) + if total <= 0 and GATE_POS_MODE == "hedge": + total = _sum_live_position_contracts(rows, exchange_symbol, direction, relax_direction=True) + return total + + def _select_live_position_row(rows, exchange_symbol, direction, relax_hedge=False): """在 fetch_positions 结果中取与当前监控方向一致、张数最大的一条(与 get_live_position_contracts 过滤规则一致)。""" if not rows: @@ -3805,6 +3825,11 @@ def resolve_synced_flat_close(row, opened_at_str, opened_at_ms=None): def reconcile_external_closes(conn, days=None): + global _RECONCILE_FLAT_STREAK + if not exchange_private_api_configured(): + return 0 + if time.time() - _APP_STARTED_AT < RECONCILE_STARTUP_GRACE_SEC: + return 0 synced_count = 0 cutoff_ms = None if days is not None: @@ -3822,12 +3847,24 @@ def reconcile_external_closes(conn, days=None): # 手动同步按最近 N 天过滤,避免把更早历史单误同步进来 if opened_ms is None or opened_ms < cutoff_ms: continue - exchange_symbol = r["exchange_symbol"] or normalize_exchange_symbol(r["symbol"]) + exchange_symbol = resolve_monitor_exchange_symbol(r) live_contracts = get_live_position_contracts(exchange_symbol, r["direction"]) + oid = int(r["id"]) if live_contracts is None: + _RECONCILE_FLAT_STREAK.pop(oid, None) continue if live_contracts > 0: + _RECONCILE_FLAT_STREAK.pop(oid, None) continue + streak = int(_RECONCILE_FLAT_STREAK.get(oid, 0)) + 1 + _RECONCILE_FLAT_STREAK[oid] = streak + if streak < RECONCILE_FLAT_CONFIRM_POLLS: + continue + _RECONCILE_FLAT_STREAK.pop(oid, None) + print( + f"[reconcile_external_closes] {r['symbol']} id={oid} " + f"flat x{streak} polls -> sync close" + ) cancel_gate_swap_trigger_orders(exchange_symbol) opened_at = get_opened_at_value(r) opened_at_ms = _to_ms_with_fallback(r["opened_at_ms"] if "opened_at_ms" in r.keys() else None, opened_at)