From 9678cd54cefbd370f44db50841dd11351d3aeb31 Mon Sep 17 00:00:00 2001 From: dekun Date: Sat, 30 May 2026 09:43:50 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dokx=20=E8=B6=8B=E5=8A=BF?= =?UTF-8?q?=E5=9B=9E=E8=B0=83?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crypto_monitor_okx/app.py | 67 +++++++++++++++++++++----------------- strategy_trend_register.py | 40 ++++++++++++++++++++++- 2 files changed, 77 insertions(+), 30 deletions(-) diff --git a/crypto_monitor_okx/app.py b/crypto_monitor_okx/app.py index 955a0ae..51cc6e9 100644 --- a/crypto_monitor_okx/app.py +++ b/crypto_monitor_okx/app.py @@ -2672,7 +2672,36 @@ def _position_matches_wanted_contract(exchange_symbol, position): if not position: return False sym = position.get("symbol") - return sym == exchange_symbol + if sym == exchange_symbol: + return True + try: + return normalize_okx_symbol(sym or "") == normalize_okx_symbol(exchange_symbol or "") + except Exception: + return False + + +def _fetch_okx_swap_position_rows(exchange_symbol=None): + """OKX 单合约 fetch_positions([sym]) 常返回空;与 /api/prices 一致拉全量 SWAP 再本地匹配。""" + ensure_markets_loaded() + rows = None + for fetcher in ( + lambda: exchange.fetch_positions(None, {"instType": OKX_POSITION_INST_TYPE}), + lambda: exchange.fetch_positions(), + ): + try: + rows = fetcher() or [] + break + except Exception: + continue + if rows is None: + return None + if not exchange_symbol: + return rows + out = [] + for p in rows: + if _position_matches_wanted_contract(exchange_symbol, p): + out.append(p) + return out def _select_live_position_row(rows, exchange_symbol, direction, relax_hedge=False): @@ -2975,35 +3004,15 @@ def is_no_position_error(err_msg): def get_live_position_contracts(exchange_symbol, direction): - ensure_markets_loaded() - try: - rows = exchange.fetch_positions([exchange_symbol], params={"instType": OKX_POSITION_INST_TYPE}) - except Exception: + rows = _fetch_okx_swap_position_rows(exchange_symbol) + if rows is None: return None - total = 0.0 - for p in rows: - if p.get("symbol") != exchange_symbol: - continue - info = p.get("info", {}) or {} - side = (p.get("side") or info.get("posSide") or "").lower() - contracts = p.get("contracts") - if contracts is None: - raw_pos = info.get("pos") - try: - contracts = abs(float(raw_pos)) if raw_pos is not None else 0.0 - except Exception: - contracts = 0.0 - try: - contracts = float(contracts) - except Exception: - contracts = 0.0 - if contracts <= 0: - continue - if OKX_POS_MODE == "hedge": - if side and side != direction: - continue - total += contracts - return total + if not rows: + return 0.0 + prow = _select_live_position_row(rows, exchange_symbol, direction) + if not prow: + return 0.0 + return _position_row_effective_contracts(prow) def opened_at_str_to_ms(opened_at_str): diff --git a/strategy_trend_register.py b/strategy_trend_register.py index 84f6ab9..f7e5ca3 100644 --- a/strategy_trend_register.py +++ b/strategy_trend_register.py @@ -32,6 +32,11 @@ from strategy_trade_labels import ( MONITOR_TYPE_TREND = MONITOR_TYPE_TREND_PULLBACK +# 趋势回调:交易所报空仓需连续 N 次轮询确认,避免 OKX 等 API 瞬时误判立即结束计划 +_TREND_FLAT_STREAK: dict[int, int] = {} +TREND_FLAT_CONFIRM_POLLS = max(1, int(os.getenv("TREND_FLAT_CONFIRM_POLLS", "3"))) +TREND_OPEN_GRACE_SEC = max(0, int(os.getenv("TREND_OPEN_GRACE_SEC", "90"))) + def trend_add_zone_label(direction: str) -> str: return "补仓下沿" if (direction or "long").strip().lower() == "short" else "补仓上沿" @@ -402,6 +407,36 @@ def _finalize_plan(cfg: dict, conn, row, result_label: str, exit_price: float) - conn.commit() +def _trend_plan_open_age_sec(row, m) -> float: + opened_ms = None + try: + if "opened_at_ms" in row.keys() and row["opened_at_ms"]: + opened_ms = int(row["opened_at_ms"]) + except Exception: + opened_ms = None + to_ms = getattr(m, "_to_ms_with_fallback", None) + if callable(to_ms): + opened_ms = to_ms(opened_ms, row["opened_at"] if "opened_at" in row.keys() else None) + if not opened_ms: + return 999999.0 + return max(0.0, (time.time() * 1000 - opened_ms) / 1000.0) + + +def _should_finalize_trend_flat(row, pos, plan_id: int, m) -> bool: + """首仓后交易所报无仓:需过开仓宽限期 + 连续空仓轮询,避免误判止损。""" + if pos is None or float(pos) > 0: + _TREND_FLAT_STREAK.pop(plan_id, None) + return False + if not int(row["first_order_done"] or 0): + return False + if _trend_plan_open_age_sec(row, m) < TREND_OPEN_GRACE_SEC: + _TREND_FLAT_STREAK.pop(plan_id, None) + return False + streak = int(_TREND_FLAT_STREAK.get(plan_id, 0)) + 1 + _TREND_FLAT_STREAK[plan_id] = streak + return streak >= TREND_FLAT_CONFIRM_POLLS + + def check_trend_pullback_plans(cfg: dict) -> None: m = _m(cfg) ok_live, _ = m.ensure_exchange_live_ready() @@ -413,6 +448,7 @@ def check_trend_pullback_plans(cfg: dict) -> None: ).fetchall() for row in rows: try: + plan_id = int(row["id"]) sym = row["symbol"] direction = (row["direction"] or "long").lower() ex_sym = row["exchange_symbol"] or m.normalize_exchange_symbol(sym) @@ -447,9 +483,11 @@ def check_trend_pullback_plans(cfg: dict) -> None: continue exit_p = pf _finalize_plan(cfg, conn, row, "止盈", exit_p) + _TREND_FLAT_STREAK.pop(plan_id, None) continue - if pos <= 0 and int(row["first_order_done"] or 0): + if _should_finalize_trend_flat(row, pos, plan_id, m): _finalize_plan(cfg, conn, row, "止损", pf) + _TREND_FLAT_STREAK.pop(plan_id, None) continue if int(row["first_order_done"] or 0) and legs_done < len(grid) and legs_done < len(leg_amounts): level = float(grid[legs_done])