diff --git a/install_trading.py b/install_trading.py index 121f5a7..477d528 100644 --- a/install_trading.py +++ b/install_trading.py @@ -128,22 +128,80 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se except Exception: return {} - def _ctp_positions(mode: str) -> list: + def _ctp_positions(mode: str, *, refresh_if_empty: bool = True) -> list: try: - return ctp_list_positions(mode) + return ctp_list_positions(mode, refresh_if_empty=refresh_if_empty) except Exception: return [] + def _ctp_pos_to_ths_code(p: dict) -> str: + sym = (p.get("symbol") or "").strip() + if not sym: + return "" + codes = ths_to_codes(sym) + if codes: + return codes.get("ths_code") or sym + return sym + + def _ensure_monitors_from_ctp(conn, mode: str) -> None: + """CTP 有持仓但本地无监控时,自动补写一条 active 记录供展示。""" + if not ctp_status(mode).get("connected"): + return + for p in _ctp_positions(mode, refresh_if_empty=True): + lots = int(p.get("lots") or 0) + if lots <= 0: + continue + direction = p.get("direction") or "long" + ths = _ctp_pos_to_ths_code(p) + if not ths: + continue + if _find_active_monitor(conn, ths, direction): + continue + codes = ths_to_codes(ths) or {} + now_s = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ensure_monitor_order_columns(conn) + conn.execute( + """INSERT INTO trade_order_monitors ( + symbol, symbol_name, market_code, direction, lots, entry_price, + stop_loss, take_profit, initial_stop_loss, trailing_be, + open_time, monitor_type, status + ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?, 'active')""", + ( + ths, + codes.get("name", ths) if codes else ths, + codes.get("market_code", "") if codes else "", + direction, + lots, + float(p.get("avg_price") or 0), + None, + None, + None, + 0, + now_s, + "ctp_sync", + ), + ) + def _match_ctp_symbol(ctp_sym: str, ths: str) -> bool: a = (ctp_sym or "").lower() b = (ths or "").lower() if a == b: return True + if a and b and a.split(".")[0] == b.split(".")[0]: + return True try: vnpy_sym, _ = ths_to_vnpy_symbol(ths) - return a == vnpy_sym.lower() + if a == vnpy_sym.lower(): + return True except Exception: - return False + pass + try: + vnpy_sym, _ = ths_to_vnpy_symbol(ctp_sym) + if vnpy_sym.lower() == b.split(".")[0]: + return True + except Exception: + pass + return False def _holding_duration(open_time: str, now_iso: str) -> str: try: @@ -543,11 +601,14 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se mon.get("direction") or "long", mode, ) mon = _find_active_monitor(conn, mon.get("symbol") or "", mon.get("direction") or "long") or mon - row = _compose_position_row( - conn, mon=mon, ctp=ctp, mode=mode, capital=capital, now_iso=now_iso, - ) - if row: - rows.append(row) + try: + row = _compose_position_row( + conn, mon=mon, ctp=ctp, mode=mode, capital=capital, now_iso=now_iso, + ) + if row: + rows.append(row) + except Exception as exc: + logger.warning("compose monitor row failed: %s", exc) for key, ctp in ctp_by_key.items(): if key in used_ctp_keys: @@ -570,11 +631,14 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se mon = _find_active_monitor( conn, ctp.get("symbol") or "", ctp.get("direction") or "long", ) - row = _compose_position_row( - conn, mon=mon, ctp=ctp, mode=mode, capital=capital, now_iso=now_iso, - ) - if row: - rows.append(row) + try: + row = _compose_position_row( + conn, mon=mon, ctp=ctp, mode=mode, capital=capital, now_iso=now_iso, + ) + if row: + rows.append(row) + except Exception as exc: + logger.warning("compose ctp row failed: %s", exc) seen: set[str] = set() deduped: list[dict] = [] @@ -589,7 +653,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se def _build_trading_live_payload(conn) -> dict: mode = get_trading_mode(get_setting) ctp_st = ctp_status(mode) - _sync_trade_monitors_with_ctp(conn, mode) + _ensure_monitors_from_ctp(conn, mode) rows = _build_trading_live_rows(conn) pending_orders = _build_pending_orders(conn, mode) capital = _capital(conn) @@ -638,7 +702,6 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se init_strategy_tables(conn) mode = get_trading_mode(get_setting) ctp_st = ctp_status(mode) - _sync_trade_monitors_with_ctp(conn, mode) capital = _capital(conn) risk = get_risk_status(conn, active_count=_effective_active_position_count(conn, mode)) ctp_acc = _ctp_account(mode) if ctp_st.get("connected") else {} @@ -709,9 +772,6 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se @app.route("/api/trading/live") @login_required def api_trading_live(): - cached = position_hub.get_snapshot() - if cached: - return jsonify(cached) conn = get_db() try: init_strategy_tables(conn) @@ -1314,7 +1374,6 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se init_strategy_tables(conn) mode = get_trading_mode(get_setting) ctp_st = ctp_status(mode) - _sync_trade_monitors_with_ctp(conn, mode) capital = _capital(conn) risk = get_risk_status(conn, active_count=_effective_active_position_count(conn, mode)) conn.commit() diff --git a/position_stream.py b/position_stream.py index 15c04a4..a90f82d 100644 --- a/position_stream.py +++ b/position_stream.py @@ -91,7 +91,7 @@ def start_position_worker( r.get("stop_loss") is not None or r.get("take_profit") is not None for r in rows ) - if connected and in_session and (rows or has_sl_tp): + if connected and in_session: sleep_sec = max(1, interval) elif connected: sleep_sec = max(2, min(idle_interval, 3)) diff --git a/sl_tp_guard.py b/sl_tp_guard.py index 141e0b4..588da81 100644 --- a/sl_tp_guard.py +++ b/sl_tp_guard.py @@ -63,11 +63,21 @@ def _match_symbol(ctp_sym: str, ths: str) -> bool: b = (ths or "").lower() if a == b: return True + if a and b and a.split(".")[0] == b.split(".")[0]: + return True try: vnpy_sym, _ = ths_to_vnpy_symbol(ths) - return a == vnpy_sym.lower() + if a == vnpy_sym.lower(): + return True except Exception: - return False + pass + try: + vnpy_sym, _ = ths_to_vnpy_symbol(ctp_sym) + if vnpy_sym.lower() == b.split(".")[0]: + return True + except Exception: + pass + return False def _close_order_direction(hold_direction: str) -> str: diff --git a/static/js/trade.js b/static/js/trade.js index 6c0b6a1..6e7d8f9 100644 --- a/static/js/trade.js +++ b/static/js/trade.js @@ -26,7 +26,7 @@ var selectedMaxLots = null; var recommendMaxByProduct = {}; var recommendMaxByCode = {}; - var POS_CACHE_KEY = 'qihuo_trading_live_v1'; + var POS_CACHE_KEY = 'qihuo_trading_live_v2'; function runWhenReady(fn) { if (document.readyState === 'loading') { diff --git a/vnpy_bridge.py b/vnpy_bridge.py index 2c51f6a..5341878 100644 --- a/vnpy_bridge.py +++ b/vnpy_bridge.py @@ -105,6 +105,7 @@ class CtpBridge: self._commission_lists: dict[int, list] = {} self._commission_hooked = False self._subscribed: set[str] = set() + self._last_position_query_ts: float = 0.0 self._tick_hooked = False self._bar_generators: dict[str, Any] = {} self._bars_1m: dict[str, deque] = {} @@ -698,7 +699,7 @@ class CtpBridge: "accountid": getattr(acc, "accountid", ""), } - def list_positions(self) -> list[dict[str, Any]]: + def _collect_positions(self) -> list[dict[str, Any]]: if not self._engine: return [] out: list[dict[str, Any]] = [] @@ -706,12 +707,7 @@ class CtpBridge: vol = int(getattr(pos, "volume", 0) or 0) if vol <= 0: continue - direction = getattr(pos, "direction", None) - d = "long" - if direction is not None and str(direction).endswith("SHORT"): - d = "short" - elif direction is not None and "空" in str(direction): - d = "short" + d = "long" if _is_long_direction(getattr(pos, "direction", None)) else "short" sym = getattr(pos, "symbol", "") or "" exchange = getattr(pos, "exchange", None) ex_name = str(exchange.value if hasattr(exchange, "value") else exchange or "") @@ -726,6 +722,30 @@ class CtpBridge: }) return out + def refresh_positions(self) -> None: + """向柜台查询持仓(内存为空时补拉)。""" + if not self._engine: + return + now = time.time() + if now - self._last_position_query_ts < 1.0: + return + self._last_position_query_ts = now + try: + gw = self._engine.get_gateway(GATEWAY_NAME) + td = getattr(gw, "td_api", None) + if td and hasattr(td, "query_position"): + td.query_position() + time.sleep(0.4) + except Exception as exc: + logger.debug("refresh_positions: %s", exc) + + def list_positions(self, *, refresh_if_empty: bool = True) -> list[dict[str, Any]]: + out = self._collect_positions() + if not out and refresh_if_empty: + self.refresh_positions() + out = self._collect_positions() + return out + def list_active_orders(self) -> list[dict[str, Any]]: if not self._engine: return [] @@ -907,10 +927,10 @@ def ctp_get_account(mode: str) -> dict[str, Any]: return b.get_account() -def ctp_list_positions(mode: str) -> list[dict[str, Any]]: +def ctp_list_positions(mode: str, *, refresh_if_empty: bool = True) -> list[dict[str, Any]]: b = get_bridge() b.ensure_connected(mode) - return b.list_positions() + return b.list_positions(refresh_if_empty=refresh_if_empty) def ctp_list_active_orders(mode: str) -> list[dict[str, Any]]: