diff --git a/ctp_trading_state.py b/ctp_trading_state.py new file mode 100644 index 0000000..36eb857 --- /dev/null +++ b/ctp_trading_state.py @@ -0,0 +1,201 @@ +# Copyright (c) 2025-2026 马建军. All rights reserved. +# 专有软件 — 未经授权禁止复制、传播、转售。 +# 详见 LICENSE.zh-CN.txt + +"""CTP 权威内存簿:委托、持仓、同步状态(事件增量 + 定期全量校准)。""" +from __future__ import annotations + +import logging +import threading +import time +from typing import Any, Callable, Optional + +logger = logging.getLogger(__name__) + +CALIBRATE_INTERVAL_SEC = 30.0 + + +def position_key(exchange: str, symbol: str, direction: str) -> str: + """统一持仓键:exchange|symbol|direction""" + ex = (exchange or "").strip().upper() + sym = (symbol or "").strip().lower() + d = (direction or "long").strip().lower() + if ex: + return f"{ex}|{sym}|{d}" + return f"{sym}|{d}" + + +def parse_position_key(key: str) -> tuple[str, str, str]: + parts = (key or "").split("|") + if len(parts) >= 3: + return parts[0], parts[1], parts[2] + if len(parts) == 2: + return "", parts[0], parts[1] + return "", (key or "").lower(), "long" + + +class CtpTradingState: + """进程内 CTP 快照:柜台回报为准,SQLite 仅挂 SL/TP 元数据。""" + + def __init__(self) -> None: + self._lock = threading.RLock() + self._orders: dict[str, dict[str, Any]] = {} + self._positions: dict[str, dict[str, Any]] = {} + self._tick_prices: dict[str, float] = {} + self._sync_state = "idle" + self._last_event_ts: float = 0.0 + self._last_calibrate_ts: float = 0.0 + self._on_change: Optional[Callable[[], None]] = None + + def set_change_callback(self, fn: Optional[Callable[[], None]]) -> None: + self._on_change = fn + + def _notify(self) -> None: + self._last_event_ts = time.time() + fn = self._on_change + if fn: + try: + fn() + except Exception as exc: + logger.debug("trading state change callback: %s", exc) + + @property + def sync_state(self) -> str: + with self._lock: + return self._sync_state + + def sync_label(self) -> str: + st = self.sync_state + if st == "syncing": + return "同步中…" + if st == "ready": + return "已同步" + return "" + + def begin_sync(self) -> None: + with self._lock: + self._sync_state = "syncing" + + def finish_sync(self) -> None: + with self._lock: + self._sync_state = "ready" + self._last_calibrate_ts = time.time() + + def needs_calibrate(self) -> bool: + with self._lock: + if self._sync_state == "idle": + return True + return (time.time() - self._last_calibrate_ts) >= CALIBRATE_INTERVAL_SEC + + def upsert_order(self, row: dict[str, Any], *, notify: bool = True) -> None: + oid = str(row.get("order_id") or row.get("vt_order_id") or "").strip() + if not oid: + return + with self._lock: + self._orders[oid] = dict(row) + if notify: + self._notify() + + def remove_order(self, order_id: str, *, notify: bool = True) -> None: + oid = (order_id or "").strip() + if not oid: + return + removed = False + with self._lock: + if oid in self._orders: + del self._orders[oid] + removed = True + else: + for k in list(self._orders.keys()): + if k == oid or k.endswith(oid) or oid.endswith(k): + del self._orders[k] + removed = True + break + if removed and notify: + self._notify() + + def upsert_position(self, row: dict[str, Any], *, notify: bool = True) -> None: + lots = int(row.get("lots") or 0) + ex = row.get("exchange") or "" + sym = row.get("symbol") or "" + direction = row.get("direction") or "long" + pk = position_key(ex, sym, direction) + with self._lock: + if lots <= 0: + self._positions.pop(pk, None) + else: + row = dict(row) + row["position_key"] = pk + self._positions[pk] = row + if notify: + self._notify() + + def remove_position(self, pk: str, *, notify: bool = True) -> None: + with self._lock: + self._positions.pop(pk, None) + if notify: + self._notify() + + def set_tick_price(self, exchange: str, symbol: str, price: float) -> None: + if not symbol or price <= 0: + return + key = f"{(exchange or '').upper()}|{symbol.lower()}" + with self._lock: + self._tick_prices[key] = float(price) + + def get_tick_price(self, exchange: str, symbol: str) -> Optional[float]: + key = f"{(exchange or '').upper()}|{symbol.lower()}" + with self._lock: + return self._tick_prices.get(key) + + def get_active_orders(self) -> list[dict[str, Any]]: + with self._lock: + return list(self._orders.values()) + + def get_positions(self) -> list[dict[str, Any]]: + with self._lock: + return list(self._positions.values()) + + def position_keys(self) -> set[str]: + with self._lock: + return set(self._positions.keys()) + + def clear(self) -> None: + with self._lock: + self._orders.clear() + self._positions.clear() + self._tick_prices.clear() + self._sync_state = "idle" + + def calibrate_from_lists( + self, + orders: list[dict[str, Any]], + positions: list[dict[str, Any]], + ) -> None: + """全量校准:以 vnpy 内存为准重建订单/持仓簿。""" + self.begin_sync() + new_orders: dict[str, dict[str, Any]] = {} + for o in orders or []: + oid = str(o.get("order_id") or o.get("vt_order_id") or "").strip() + if oid: + new_orders[oid] = dict(o) + new_positions: dict[str, dict[str, Any]] = {} + for p in positions or []: + lots = int(p.get("lots") or 0) + if lots <= 0: + continue + ex = p.get("exchange") or "" + sym = p.get("symbol") or "" + direction = p.get("direction") or "long" + pk = position_key(ex, sym, direction) + row = dict(p) + row["position_key"] = pk + new_positions[pk] = row + with self._lock: + self._orders = new_orders + self._positions = new_positions + self.finish_sync() + self._notify() + + +trading_state = CtpTradingState() diff --git a/install_trading.py b/install_trading.py index 5890593..973bb50 100644 --- a/install_trading.py +++ b/install_trading.py @@ -84,6 +84,7 @@ from trading_context import ( trading_mode_label, ) from ctp_symbol import ths_to_vnpy_symbol +from ctp_trading_state import position_key, trading_state from vnpy_bridge import ( _ctp_td_lock, ctp_cancel_order, @@ -99,6 +100,7 @@ from vnpy_bridge import ( execute_order, get_bridge, set_position_refresh_callback, + set_tick_sl_tp_callback, ) @@ -207,11 +209,17 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se def _ctp_pos_to_ths_code(p: dict) -> str: sym = (p.get("symbol") or "").strip() + ex = (p.get("exchange") or "").strip() if not sym: return "" codes = ths_to_codes(sym) if codes: return codes.get("ths_code") or sym + if ex: + from vnpy_bridge import CtpBridge + ths = CtpBridge._vnpy_sym_to_ths(sym, ex) + if ths: + return ths return sym def _resolve_position_margin( @@ -334,7 +342,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se """CTP 有持仓但本地无监控时,自动补写一条 active 记录供展示。""" if not ctp_status(mode).get("connected"): return - for p in _ctp_positions(mode, refresh_if_empty=True): + ctp_positions = _ctp_positions(mode, refresh_if_empty=True) + for p in ctp_positions: lots = int(p.get("lots") or 0) if lots <= 0: continue @@ -342,7 +351,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se ths = _ctp_pos_to_ths_code(p) if not ths: continue - existing = _find_active_monitor(conn, ths, direction) + existing = _find_or_revive_monitor(conn, ths, direction) if existing: _sync_monitor_from_ctp( conn, int(existing["id"]), ths, direction, mode, ctp=p, @@ -368,6 +377,43 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se "UPDATE trade_order_monitors SET initial_stop_loss=? WHERE id=?", (initial_sl, mid), ) + if ctp_positions: + return + + def _restore_recent_pending_monitors(conn, mode: str) -> None: + """重启或 vnpy 委托缓存丢失时,恢复当日最近一笔可能仍有效的开仓挂单。""" + if not ctp_status(mode).get("connected"): + return + if conn.execute("SELECT 1 FROM trade_order_monitors WHERE status='pending' LIMIT 1").fetchone(): + return + today = datetime.now().strftime("%Y-%m-%d") + row = conn.execute( + """SELECT * FROM trade_order_monitors + WHERE status='closed' AND monitor_type='manual' + AND vt_order_id IS NOT NULL AND vt_order_id != '' + AND open_time LIKE ? + ORDER BY id DESC LIMIT 1""", + (f"{today}%",), + ).fetchone() + if not row: + return + mon = dict(row) + sym = mon.get("symbol") or "" + direction = (mon.get("direction") or "long").strip().lower() + if _find_active_monitor(conn, sym, direction): + return + for p in _ctp_positions(mode, refresh_if_empty=False): + if int(p.get("lots") or 0) <= 0: + continue + if (p.get("direction") or "long") != direction: + continue + if _match_ctp_symbol(p.get("symbol") or "", sym): + return + conn.execute( + "UPDATE trade_order_monitors SET status='pending' WHERE id=?", + (mon["id"],), + ) + logger.info("恢复挂单监控 id=%s sym=%s", mon.get("id"), sym) def _match_ctp_symbol(ctp_sym: str, ths: str) -> bool: a = (ctp_sym or "").lower() @@ -540,7 +586,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se ctp_st = ctp_status(mode) if ctp_st.get("connected"): for o in _ctp_active_orders(mode): - sym = o.get("symbol") or "" + sym = _ctp_pos_to_ths_code(o) or (o.get("symbol") or "") offset_s = (o.get("offset") or "").upper() kind = "limit" label = "委托挂单" @@ -548,6 +594,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se label = "平仓委托" pending.append({ "symbol_code": sym, + "symbol": _symbol_display_fields(sym).get("symbol_name") or sym, "direction": o.get("direction") or "long", "direction_label": "做多" if o.get("direction") == "long" else "做空", "lots": int(o.get("lots") or 0), @@ -556,6 +603,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se "label": label, "source": "ctp", "order_id": o.get("order_id"), + "vt_order_id": o.get("vt_order_id") or o.get("order_id"), "can_cancel_order": is_trading_session(), "cancel_allowed": is_trading_session(), **_symbol_display_fields(sym), @@ -568,14 +616,50 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se except Exception: return [] - def _canonical_position_key(symbol: str, direction: str) -> str: + def _canonical_position_key(symbol: str, direction: str, exchange: str = "") -> str: sym = (symbol or "").strip() d = (direction or "long").strip().lower() + ex = (exchange or "").strip().upper() try: - vnpy_sym, _ = ths_to_vnpy_symbol(sym) - return f"{vnpy_sym.lower()}:{d}" + vnpy_sym, ex2 = ths_to_vnpy_symbol(sym) + sym = vnpy_sym + if not ex: + ex = ex2 except Exception: - return f"{sym.lower()}:{d}" + sym = sym.lower() + return position_key(ex, sym, d) + + def _position_key_from_ctp(p: dict) -> str: + return position_key( + p.get("exchange") or "", + p.get("symbol") or "", + p.get("direction") or "long", + ) + + def _monitor_position_key(mon: dict, exchange: str = "") -> str: + sym = (mon.get("symbol") or "").strip() + d = (mon.get("direction") or "long").strip().lower() + ex = (exchange or "").strip().upper() + try: + vnpy_sym, ex2 = ths_to_vnpy_symbol(sym) + sym = vnpy_sym + if not ex: + ex = ex2 + except Exception: + sym = sym.lower() + return position_key(ex, sym, d) + + def _monitors_by_position_key(conn) -> dict[str, dict]: + ensure_monitor_order_columns(conn) + out: dict[str, dict] = {} + for r in conn.execute( + "SELECT * FROM trade_order_monitors WHERE status='active' ORDER BY id DESC" + ).fetchall(): + mon = dict(r) + pk = _monitor_position_key(mon) + if pk not in out: + out[pk] = mon + return out def _find_active_monitor(conn, symbol: str, direction: str) -> Optional[dict]: direction = (direction or "long").strip().lower() @@ -589,6 +673,51 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se return row return None + def _revive_closed_monitor(conn, symbol: str, direction: str) -> Optional[dict]: + """柜台仍有持仓但本地监控被误关时,恢复最近一条同品种记录。""" + direction = (direction or "long").strip().lower() + for r in conn.execute( + "SELECT * FROM trade_order_monitors WHERE status='closed' ORDER BY id DESC LIMIT 40" + ).fetchall(): + row = dict(r) + if (row.get("direction") or "long") != direction: + continue + if not _match_ctp_symbol(symbol, row.get("symbol") or ""): + continue + if int(row.get("lots") or 0) <= 0: + continue + conn.execute( + "UPDATE trade_order_monitors SET status='active' WHERE id=?", + (row["id"],), + ) + row["status"] = "active" + logger.info( + "恢复误关闭监控 id=%s sym=%s dir=%s", + row.get("id"), row.get("symbol"), direction, + ) + return row + return None + + def _find_or_revive_monitor(conn, symbol: str, direction: str) -> Optional[dict]: + active = _find_active_monitor(conn, symbol, direction) + if active: + return active + return _revive_closed_monitor(conn, symbol, direction) + + def _close_all_monitors_for_sym_dir(conn, symbol: str, direction: str) -> None: + direction = (direction or "long").strip().lower() + for r in conn.execute( + "SELECT id, symbol, direction FROM trade_order_monitors " + "WHERE status IN ('active', 'pending')" + ).fetchall(): + if (r["direction"] or "long") != direction: + continue + if _match_ctp_symbol(symbol, r["symbol"] or ""): + conn.execute( + "UPDATE trade_order_monitors SET status='closed' WHERE id=?", + (r["id"],), + ) + def _close_duplicate_monitors(conn, symbol: str, direction: str, keep_id: int) -> None: direction = (direction or "long").strip().lower() for r in conn.execute( @@ -950,18 +1079,15 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se "source": "monitor", "monitor_id": mon["id"] if mon else None, }) - row_key = _canonical_position_key(sym, direction) - ctp_st = ctp_status(mode) - sync_pending = ( - mon is not None - and ctp is None - and bool(ctp_st.get("connected")) + row_key = _canonical_position_key( + sym, direction, (ctp or {}).get("exchange") or "", ) return { "key": row_key, - "source": "ctp" if ctp else "local", + "position_key": row_key, + "source": "ctp", "source_label": source_label, - "sync_pending": sync_pending, + "sync_pending": False, "monitor_id": mon["id"] if mon else None, "symbol_code": sym, **_symbol_display_fields(sym), @@ -1073,6 +1199,69 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se "trailing_r_locked": int(mon.get("trailing_r_locked") or 0), } + def _compose_ctp_open_order_row( + o: dict, + *, + mode: str, + capital: float, + now_iso: str, + ) -> Optional[dict]: + offset_u = (o.get("offset") or "").upper() + if offset_u and "OPEN" not in offset_u: + return None + sym = _ctp_pos_to_ths_code(o) or (o.get("symbol") or "").strip() + direction = (o.get("direction") or "long").strip().lower() + lots = int(o.get("lots") or 0) + if not sym or lots <= 0: + return None + order_price = float(o.get("price") or 0) + pos_metrics = calc_position_metrics( + direction, order_price, order_price, order_price, lots, order_price, capital, sym, + ) + timeout_sec = get_pending_order_timeout_sec(get_setting) + return { + "key": f"{_canonical_position_key(sym, direction)}:pending:ctp:{o.get('order_id') or ''}", + "order_state": "pending", + "source": "ctp", + "source_label": "委托挂单", + "sync_pending": True, + "monitor_id": None, + "order_id": o.get("order_id"), + "vt_order_id": o.get("vt_order_id") or o.get("order_id"), + "symbol_code": sym, + **_symbol_display_fields(sym), + "direction": direction, + "direction_label": "做多" if direction == "long" else "做空", + "lots": lots, + "entry_price": order_price, + "order_price": order_price, + "stop_loss": None, + "take_profit": None, + "open_time": now_iso, + "holding_duration": None, + "mark_price": order_price, + "current_price": order_price, + "margin": pos_metrics.get("margin"), + "margin_source": "estimate", + "position_pct": pos_metrics.get("position_pct"), + "float_pnl": None, + "est_fee": None, + "can_close": False, + "close_allowed": False, + "can_cancel_order": is_trading_session(), + "cancel_allowed": is_trading_session(), + "pending_timeout_sec": timeout_sec, + "pending_timeout_min": max(1, timeout_sec // 60), + "sl_order_active": False, + "tp_order_active": False, + "sl_monitoring": False, + "tp_monitoring": False, + "can_place_orders": False, + "pending_orders": [], + "trailing_be": False, + "trailing_r_locked": 0, + } + def _reconcile_pending(conn, mode: str, *, capital: float = 0.0) -> None: reconcile_pending_orders( conn, @@ -1084,7 +1273,123 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se timeout_sec=get_pending_order_timeout_sec(get_setting), ) + def _build_active_orders( + conn, + *, + mode: str, + capital: float, + now_iso: str, + ) -> list[dict]: + """当前委托:以 CTP 柜台为准,本地 pending 开仓单合并展示。""" + orders: list[dict] = [] + seen_keys: set[str] = set() + + if ctp_status(mode).get("connected"): + ctp_orders = trading_state.get_active_orders() + if not ctp_orders: + ctp_orders = _ctp_active_orders(mode) + for o in ctp_orders: + try: + row = _compose_ctp_open_order_row( + o, mode=mode, capital=capital, now_iso=now_iso, + ) + if not row: + row = _compose_ctp_order_row_any( + o, mode=mode, capital=capital, now_iso=now_iso, + ) + if row: + orders.append(row) + seen_keys.add(row.get("key") or "") + except Exception as exc: + logger.warning("compose ctp order row failed: %s", exc) + + for r in conn.execute( + "SELECT * FROM trade_order_monitors WHERE status='pending' ORDER BY id DESC" + ).fetchall(): + mon = dict(r) + try: + prow = _compose_pending_row( + mon, mode=mode, capital=capital, now_iso=now_iso, + ) + if prow and prow.get("key") not in seen_keys: + pk = f"{prow.get('symbol_code') or ''}:{prow.get('direction') or ''}" + dup = any( + (x.get("symbol_code") or "") + ":" + (x.get("direction") or "") == pk + and x.get("order_state") == "pending" + for x in orders + ) + if not dup: + orders.append(prow) + except Exception as exc: + logger.warning("compose pending order row failed: %s", exc) + return orders + + def _compose_ctp_order_row_any( + o: dict, + *, + mode: str, + capital: float, + now_iso: str, + ) -> Optional[dict]: + """CTP 任意未成交委托(含平仓)。""" + sym = _ctp_pos_to_ths_code(o) or (o.get("symbol") or "").strip() + direction = (o.get("direction") or "long").strip().lower() + lots = int(o.get("lots") or 0) + if not sym or lots <= 0: + return None + offset_u = (o.get("offset") or "").upper() + is_open = not offset_u or "OPEN" in offset_u + order_price = float(o.get("price") or 0) + pos_metrics = calc_position_metrics( + direction, order_price, order_price, order_price, lots, order_price, capital, sym, + ) + label = "开仓委托" if is_open else "平仓委托" + timeout_sec = get_pending_order_timeout_sec(get_setting) + ex = o.get("exchange") or "" + pk = _canonical_position_key(sym, direction, ex) + return { + "key": f"{pk}:order:{o.get('order_id') or ''}", + "order_state": "pending", + "source": "ctp", + "source_label": label, + "sync_pending": False, + "monitor_id": None, + "order_id": o.get("order_id"), + "vt_order_id": o.get("vt_order_id") or o.get("order_id"), + "symbol_code": sym, + **_symbol_display_fields(sym), + "direction": direction, + "direction_label": "做多" if direction == "long" else "做空", + "lots": lots, + "entry_price": order_price, + "order_price": order_price, + "stop_loss": None, + "take_profit": None, + "open_time": now_iso, + "mark_price": order_price, + "current_price": order_price, + "margin": pos_metrics.get("margin"), + "margin_source": "estimate", + "position_pct": pos_metrics.get("position_pct"), + "float_pnl": None, + "can_close": False, + "close_allowed": False, + "can_cancel_order": is_trading_session(), + "cancel_allowed": is_trading_session(), + "pending_timeout_sec": timeout_sec if is_open else None, + "pending_timeout_min": max(1, timeout_sec // 60) if is_open else None, + "sl_order_active": False, + "tp_order_active": False, + "sl_monitoring": False, + "tp_monitoring": False, + "can_place_orders": False, + "pending_orders": [], + "trailing_be": False, + "trailing_r_locked": 0, + } + def _build_trading_live_rows(conn, *, fast: bool = False) -> list[dict]: + """当前持仓:以 CTP 为准,SQLite 仅叠加 SL/TP 元数据。""" from zoneinfo import ZoneInfo tz = ZoneInfo("Asia/Shanghai") now_iso = datetime.now(tz).strftime("%Y-%m-%dT%H:%M") @@ -1092,127 +1397,73 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se capital = _capital(conn) ensure_monitor_order_columns(conn) - monitors_raw = [ - dict(r) for r in conn.execute( - "SELECT * FROM trade_order_monitors WHERE status='active' ORDER BY id DESC" - ).fetchall() - ] - monitor_by_key: dict[str, dict] = {} - for mon in monitors_raw: - key = _canonical_position_key(mon.get("symbol") or "", mon.get("direction") or "long") - if key not in monitor_by_key: - monitor_by_key[key] = mon - - ctp_list: list[dict] = ( - _ctp_positions(mode, refresh_if_empty=not fast, refresh_margin=not fast) - if ctp_status(mode).get("connected") else [] - ) - ctp_by_key: dict[str, dict] = {} - for p in ctp_list: - if int(p.get("lots") or 0) <= 0: - continue - key = _canonical_position_key(p.get("symbol") or "", p.get("direction") or "long") - ctp_by_key[key] = p + monitor_by_pk = _monitors_by_position_key(conn) + ctp_list: list[dict] = [] + if ctp_status(mode).get("connected"): + ctp_list = trading_state.get_positions() + if not ctp_list: + ctp_list = _ctp_positions( + mode, refresh_if_empty=not fast, refresh_margin=not fast, + ) rows: list[dict] = [] - used_ctp_keys: set[str] = set() - - for key, mon in monitor_by_key.items(): - ctp = ctp_by_key.get(key) - if not ctp: - for ck, cp in ctp_by_key.items(): - if ck in used_ctp_keys: + for p in ctp_list: + lots = int(p.get("lots") or 0) + if lots <= 0: + continue + pk = p.get("position_key") or _position_key_from_ctp(p) + mon = monitor_by_pk.get(pk) + if not mon: + for mk, mv in monitor_by_pk.items(): + if (mv.get("direction") or "long") != (p.get("direction") or "long"): continue - if (cp.get("direction") or "long") != (mon.get("direction") or "long"): - continue - if _match_ctp_symbol(cp.get("symbol") or "", mon.get("symbol") or ""): - ctp = cp - used_ctp_keys.add(ck) + if _match_ctp_symbol(p.get("symbol") or "", mv.get("symbol") or ""): + mon = mv break - elif key in ctp_by_key: - used_ctp_keys.add(key) - if ctp and mon and not fast: + ths = _ctp_pos_to_ths_code(p) or (p.get("symbol") or "") + if mon and not fast: _sync_monitor_from_ctp( - conn, int(mon["id"]), mon.get("symbol") or "", - mon.get("direction") or "long", mode, ctp=ctp, - capital=capital, + conn, int(mon["id"]), mon.get("symbol") or ths, + mon.get("direction") or p.get("direction") or "long", + mode, ctp=p, capital=capital, ) - mon = _find_active_monitor(conn, mon.get("symbol") or "", mon.get("direction") or "long") or mon + mon = _find_active_monitor( + conn, mon.get("symbol") or ths, mon.get("direction") or "long", + ) or mon try: row = _compose_position_row( - conn, mon=mon, ctp=ctp, mode=mode, capital=capital, now_iso=now_iso, - fast=fast, + conn, mon=mon, ctp=p, mode=mode, capital=capital, + now_iso=now_iso, fast=fast, ) if row: rows.append(row) except Exception as exc: - logger.warning("compose monitor row failed: %s", exc) - - for key, ctp in ctp_by_key.items(): - if key in used_ctp_keys: - continue - matched = False - for uk in used_ctp_keys: - if uk == key: - matched = True - break - if matched: - continue - for existing in rows: - if _match_ctp_symbol( - ctp.get("symbol") or "", existing.get("symbol_code") or "", - ) and (ctp.get("direction") or "long") == (existing.get("direction") or "long"): - matched = True - break - if matched: - continue - mon = _find_active_monitor( - conn, ctp.get("symbol") or "", ctp.get("direction") or "long", - ) - try: - row = _compose_position_row( - conn, mon=mon, ctp=ctp, mode=mode, capital=capital, now_iso=now_iso, - fast=fast, - ) - if row: - rows.append(row) - except Exception as exc: - logger.warning("compose ctp row failed: %s", exc) + logger.warning("compose ctp position row failed: %s", exc) seen: set[str] = set() deduped: list[dict] = [] for row in rows: - rk = row.get("key") or f"{row.get('symbol_code')}:{row.get('direction')}" + rk = row.get("key") or row.get("position_key") or "" if rk in seen: continue seen.add(rk) deduped.append(row) - - pending_raw = [ - dict(r) for r in conn.execute( - "SELECT * FROM trade_order_monitors WHERE status='pending' ORDER BY id DESC" - ).fetchall() - ] - for mon in pending_raw: - try: - prow = _compose_pending_row( - mon, mode=mode, capital=capital, now_iso=now_iso, - ) - if prow: - deduped.insert(0, prow) - except Exception as exc: - logger.warning("compose pending row failed: %s", exc) return deduped def _build_trading_live_payload(conn, *, fast: bool = False) -> dict: + from zoneinfo import ZoneInfo + tz = ZoneInfo("Asia/Shanghai") + now_iso = datetime.now(tz).strftime("%Y-%m-%dT%H:%M") mode = get_trading_mode(get_setting) ctp_st = ctp_status(mode) capital = _capital(conn) - if not fast and ctp_st.get("connected"): + if ctp_st.get("connected") and not fast: _reconcile_pending(conn, mode, capital=capital) - if ctp_st.get("connected"): - _ensure_monitors_from_ctp(conn, mode) + _sync_trade_monitors_with_ctp(conn, mode) rows = _build_trading_live_rows(conn, fast=fast) + active_orders = _build_active_orders( + conn, mode=mode, capital=capital, now_iso=now_iso, + ) rows = _apply_account_margin_to_rows(rows, mode, capital) _persist_ctp_snapshot_to_monitors(conn, rows, mode) pending_orders = _build_pending_orders(conn, mode) @@ -1220,6 +1471,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se return { "ok": True, "rows": rows, + "active_orders": active_orders, "pending_orders": pending_orders, "capital": capital, "ctp_status": ctp_st, @@ -1227,16 +1479,26 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se "risk_status": risk, "trading_session": is_trading_session(), "pending_order_timeout_min": get_pending_order_timeout_min(get_setting), + "sync_state": trading_state.sync_state, + "sync_label": trading_state.sync_label(), } def _refresh_trading_live_snapshot(*, fast: bool = False) -> dict: mode = get_trading_mode(get_setting) - if not fast and ctp_status(mode).get("connected"): - try: - with _ctp_td_lock: - get_bridge().refresh_positions() - except Exception as exc: - logger.debug("refresh positions before snapshot: %s", exc) + if ctp_status(mode).get("connected"): + if not fast: + try: + with _ctp_td_lock: + get_bridge().calibrate_trading_state() + except Exception as exc: + logger.debug("refresh calibrate: %s", exc) + for p in trading_state.get_positions() or _ctp_positions(mode, refresh_if_empty=False): + ths = _ctp_pos_to_ths_code(p) + if ths: + try: + get_bridge().subscribe_symbol(ths) + except Exception: + pass conn = get_db() try: init_strategy_tables(conn) @@ -1246,7 +1508,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se finally: conn.close() - def _push_position_snapshot_async(*, fast: bool = False) -> None: + def _push_position_snapshot_async(*, fast: bool = True) -> None: def _run() -> None: try: payload = _refresh_trading_live_snapshot(fast=fast) @@ -1256,15 +1518,43 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se threading.Thread(target=_run, daemon=True).start() + def _on_tick_sl_tp(exchange: str, symbol: str, price: float) -> None: + from sl_tp_guard import check_sl_tp_on_tick + from db_conn import DB_PATH, connect_db + + mode = get_trading_mode(get_setting) + if not ctp_status(mode).get("connected"): + return + conn = connect_db(DB_PATH) + try: + _init_tables(conn) + capital = _capital(conn) + n = check_sl_tp_on_tick( + conn, mode, exchange, symbol, price, + capital=capital, notify_fn=send_wechat_msg, + be_tick_mult=get_trailing_be_tick_buffer(get_setting), + ) + if n: + conn.commit() + _push_position_snapshot_async(fast=True) + except Exception as exc: + logger.debug("tick sl/tp: %s", exc) + finally: + conn.close() + def _bootstrap_trading_runtime() -> None: - """进程启动:立刻读库展示持仓,并异步连 CTP。""" + """进程启动:读 CTP 快照推送,事件驱动增量 + 定期全量校准。""" set_position_refresh_callback( - lambda: _push_position_snapshot_async(fast=False) + lambda: _push_position_snapshot_async(fast=True) ) + set_tick_sl_tp_callback(_on_tick_sl_tp) def _warm() -> None: try: - payload = _refresh_trading_live_snapshot(fast=True) + mode = get_trading_mode(get_setting) + if ctp_status(mode).get("connected"): + get_bridge().calibrate_trading_state() + payload = _refresh_trading_live_snapshot(fast=False) position_hub.set_snapshot(payload) position_hub.broadcast("positions", payload) except Exception as exc: @@ -1346,6 +1636,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se try: init_strategy_tables(conn) payload = _build_trading_live_payload(conn, fast=True) + conn.commit() position_hub.set_snapshot(payload) return jsonify(payload) finally: @@ -1656,12 +1947,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se symbol_name=(mon.get("symbol_name") or "") if mon else "", market_code=(mon.get("market_code") or "") if mon else "", ) - if mid: - _close_duplicate_monitors(conn, sym, direction, mid) - conn.execute( - "UPDATE trade_order_monitors SET status='closed' WHERE id=?", - (mid,), - ) + _close_all_monitors_for_sym_dir(conn, sym, direction) conn.commit() try: from ctp_trade_sync import sync_trade_logs_from_ctp @@ -2449,26 +2735,34 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se interval=1, ) _pos_refresh_tick = {"n": 0} + _last_full_calibrate = {"ts": 0.0} def _position_worker_refresh() -> dict: + import time as _time + from ctp_trading_state import CALIBRATE_INTERVAL_SEC + _pos_refresh_tick["n"] += 1 mode = get_trading_mode(get_setting) connected = bool(ctp_status(mode).get("connected")) - # 已连接时每 2 秒完整对账;未连接时每 5 秒轻量刷新(禁止 query_position) - if connected: - use_fast = _pos_refresh_tick["n"] % 2 != 0 - else: - use_fast = _pos_refresh_tick["n"] % 5 != 0 - payload = _refresh_trading_live_snapshot(fast=use_fast) - if connected and use_fast and any( - r.get("sync_pending") for r in (payload.get("rows") or []) - ): + now = _time.time() + need_full = ( + connected + and ( + trading_state.needs_calibrate() + or (now - _last_full_calibrate["ts"]) >= CALIBRATE_INTERVAL_SEC + ) + ) + if need_full: + _last_full_calibrate["ts"] = now payload = _refresh_trading_live_snapshot(fast=False) + else: + payload = _refresh_trading_live_snapshot(fast=True) return payload start_position_worker( refresh_fn=_position_worker_refresh, - interval=1, + interval=2, + idle_interval=5, ) _bootstrap_trading_runtime() start_ctp_fee_worker( diff --git a/order_pending.py b/order_pending.py index c9f807c..949a9c5 100644 --- a/order_pending.py +++ b/order_pending.py @@ -80,6 +80,37 @@ def _find_ctp_position(positions: list[dict], sym: str, direction: str) -> Optio return None +def _vt_order_in_active(vt_oid: str, active_orders: dict[str, dict]) -> bool: + oid = (vt_oid or "").strip() + if not oid: + return False + if oid in active_orders: + return True + tail = oid.rsplit("_", 1)[-1] + for key in active_orders: + if key == oid or key.endswith(tail) or oid.endswith(key): + return True + return False + + +def _symbol_open_order_active( + orders: list[dict], + sym: str, + direction: str, + match_fn: Callable[[str, str], bool], +) -> Optional[dict]: + direction = (direction or "long").strip().lower() + for o in orders or []: + offset_u = (o.get("offset") or "").upper() + if offset_u and "OPEN" not in offset_u: + continue + if (o.get("direction") or "long") != direction: + continue + if match_fn(o.get("symbol") or "", sym): + return o + return None + + def reconcile_pending_orders( conn, mode: str, @@ -103,13 +134,15 @@ def reconcile_pending_orders( else [] ) try: - active_orders = { - str(o.get("order_id") or ""): o - for o in ctp_list_active_orders(mode) - if o.get("order_id") - } + active_order_list = ctp_list_active_orders(mode) + active_orders = {} + for o in active_order_list: + for key in (o.get("order_id"), o.get("vt_order_id")): + if key: + active_orders[str(key)] = o except Exception as exc: logger.debug("list active orders: %s", exc) + active_order_list = [] active_orders = {} rows = conn.execute( @@ -137,7 +170,7 @@ def reconcile_pending_orders( stats["promoted"] += 1 continue - if vt_oid and vt_oid in active_orders: + if vt_oid and _vt_order_in_active(vt_oid, active_orders): if age >= limit_sec and is_trading_session(): if ctp_cancel_order(mode, vt_oid): conn.execute( @@ -149,13 +182,42 @@ def reconcile_pending_orders( logger.warning("pending auto-cancel failed monitor=%s order=%s", mid, vt_oid) continue - # 委托已不在活跃列表且无持仓:拒单/撤单/过期 - if age >= 8: - conn.execute( - "UPDATE trade_order_monitors SET status='closed' WHERE id=?", - (mid,), - ) - stats["closed"] += 1 + live_open = _symbol_open_order_active(active_order_list, sym, direction, match) + if live_open or (vt_oid and age < limit_sec): + if live_open and age >= limit_sec and is_trading_session(): + cancel_oid = ( + vt_oid + or live_open.get("vt_order_id") + or live_open.get("order_id") + or "" + ) + if cancel_oid and ctp_cancel_order(mode, cancel_oid): + conn.execute( + "UPDATE trade_order_monitors SET status='closed' WHERE id=?", + (mid,), + ) + stats["cancelled"] += 1 + continue + + if age >= limit_sec: + if vt_oid and is_trading_session(): + if ctp_cancel_order(mode, vt_oid): + conn.execute( + "UPDATE trade_order_monitors SET status='closed' WHERE id=?", + (mid,), + ) + stats["cancelled"] += 1 + else: + logger.info( + "pending monitor=%s order=%s kept (cancel not confirmed)", + mid, vt_oid, + ) + elif not vt_oid: + conn.execute( + "UPDATE trade_order_monitors SET status='closed' WHERE id=?", + (mid,), + ) + stats["closed"] += 1 if any(stats.values()): conn.commit() diff --git a/sl_tp_guard.py b/sl_tp_guard.py index 721423f..f6e1d34 100644 --- a/sl_tp_guard.py +++ b/sl_tp_guard.py @@ -25,6 +25,7 @@ from vnpy_bridge import ( ctp_list_active_orders, ctp_list_positions, ctp_status, + ctp_account_margin_used, execute_order, get_bridge, ) @@ -564,6 +565,13 @@ def reconcile_monitors_without_position(conn, mode: str, *, grace_sec: int = 120 """持仓已平时:关闭监控并撤销残留止盈止损挂单(新开仓 grace_sec 内不清理)。""" if not ctp_status(mode).get("connected"): return 0 + try: + bridge = get_bridge() + since_connect = time.time() - float(getattr(bridge, "_last_connect_ok_ts", 0) or 0) + if since_connect < 90: + return 0 + except Exception: + pass positions = ctp_list_positions(mode, refresh_if_empty=False, refresh_margin=False) position_keys: set[tuple[str, str]] = set() for p in positions: @@ -573,14 +581,9 @@ def reconcile_monitors_without_position(conn, mode: str, *, grace_sec: int = 120 direction = p.get("direction") or "long" position_keys.add((sym, direction)) - if not position_keys: - try: - acc = get_bridge().get_account() - margin_used = float(acc.get("balance") or 0) - float(acc.get("available") or 0) - if margin_used > 500: - return 0 - except Exception: - return 0 + margin_used = ctp_account_margin_used(mode) or 0.0 + if not position_keys and margin_used > 300: + return 0 now_ts = time.time() @@ -669,6 +672,91 @@ def _execute_local_close( logger.debug("SL/TP notify failed: %s", exc) +def check_sl_tp_on_tick( + conn, + mode: str, + exchange: str, + symbol: str, + mark: float, + *, + capital: float = 0.0, + notify_fn: Callable[[str], None] | None = None, + be_tick_mult: int = 2, +) -> int: + """EVENT_TICK 触发:仅检查与 tick 品种匹配的 active 监控。""" + ensure_monitor_order_columns(conn) + if not ctp_status(mode).get("connected") or not is_trading_session(): + return 0 + if mark <= 0: + return 0 + sym_l = (symbol or "").lower() + ex_u = (exchange or "").upper() + closed = 0 + rows = [dict(r) for r in conn.execute( + "SELECT * FROM trade_order_monitors WHERE status='active'" + ).fetchall()] + for mon in rows: + mid = int(mon.get("id") or 0) + ms = (mon.get("symbol") or "").strip() + direction = (mon.get("direction") or "long").strip().lower() + try: + vnpy_sym, ex2 = ths_to_vnpy_symbol(ms) + if sym_l != vnpy_sym.lower(): + continue + if ex_u and ex2 and ex_u != ex2.upper(): + continue + except Exception: + if sym_l != ms.lower(): + continue + + sl = mon.get("stop_loss") + tp = mon.get("take_profit") + try: + sl_f = float(sl) if sl is not None else None + tp_f = float(tp) if tp is not None else None + except (TypeError, ValueError): + sl_f, tp_f = None, None + if sl_f is None and tp_f is None: + continue + + positions = ctp_list_positions(mode) + if not _find_position(positions, ms, direction): + continue + + tick = _tick_size(ms) + if mon.get("trailing_be"): + mon = _update_trailing_stop_loss(conn, mon, mark, be_tick_mult=be_tick_mult) + try: + sl_f = float(mon["stop_loss"]) if mon.get("stop_loss") is not None else sl_f + except (TypeError, ValueError): + pass + + reason = None + if tp_f is not None and _tp_triggered(direction, tp_f, mark, tick): + reason = "take_profit" + elif sl_f is not None and _sl_triggered(direction, sl_f, mark, tick): + reason = "stop_loss" + if not reason: + continue + if mid > 0 and not _can_close_now(mid): + continue + if not _try_acquire_close_symbol(ms, direction): + continue + try: + _execute_local_close( + conn, mon, mode=mode, mark=mark, reason=reason, + capital=capital, notify_fn=notify_fn, + ) + if mid > 0: + _mark_close_attempt(mid) + closed += 1 + except Exception as exc: + logger.warning("SL/TP tick close failed monitor=%s: %s", mid, exc) + finally: + _release_close_symbol(ms, direction) + return closed + + def check_monitors_locally( conn, mode: str, diff --git a/static/css/trade.css b/static/css/trade.css index c0c7067..5a7334e 100644 --- a/static/css/trade.css +++ b/static/css/trade.css @@ -2,7 +2,9 @@ /* 持仓监控页 — 与 split-grid(关键位监控)同宽,全端自适应 */ .trade-page{width:100%} .trade-split{margin-bottom:1.25rem} -.trade-split .card{min-height:480px} +.trade-split .card{min-height:320px} +.trade-split .trade-card#order{margin-bottom:.75rem} +.sync-badge{font-size:.72rem;font-weight:400;margin-left:.35rem} .trade-top-bar{ display:flex;flex-wrap:wrap;gap:.65rem 1rem; align-items:center;justify-content:space-between; diff --git a/static/js/trade.js b/static/js/trade.js index 6a860b2..f89e240 100644 --- a/static/js/trade.js +++ b/static/js/trade.js @@ -6,6 +6,8 @@ var sizingMode = window.TRADE_SIZING_MODE || 'fixed'; if (sizingMode === 'risk') sizingMode = 'amount'; var list = document.getElementById('position-live-list'); + var orderList = document.getElementById('order-live-list'); + var syncBadge = document.getElementById('sync-badge'); var recommendList = document.getElementById('recommend-list'); var symInput = document.getElementById('trade-symbol'); var dirSelect = document.getElementById('trade-direction'); @@ -44,7 +46,7 @@ var REC_COLSPAN = 18; var marketNavEnabled = !!window.MARKET_NAV_ENABLED; var productCategories = window.PRODUCT_CATEGORIES || []; - var POS_CACHE_KEY = 'qihuo_trading_live_v3'; + var POS_CACHE_KEY = 'qihuo_trading_live_v4'; function runWhenReady(fn) { if (document.readyState === 'loading') { @@ -156,8 +158,21 @@ return !!(msg && (msg.indexOf('不可达') >= 0 || msg.indexOf('Connection refused') >= 0 || msg.indexOf('timed out') >= 0)); } + function applyActiveOrders(orders) { + if (!orderList) return; + orders = orders || []; + if (!orders.length) { + orderList.innerHTML = '
暂无委托。
'; + return; + } + orderList.innerHTML = orders.map(buildPendingOrderCard).join(''); + bindPendingDismiss(orderList); + bindCancelOpenButtons(orderList); + bindCancelOrderButtons(orderList); + } + function applyPositionsData(data) { - if (!list || !data) return; + if (!data) return; var cap = document.getElementById('cap-display'); if (cap && data.capital != null) cap.textContent = Number(data.capital).toFixed(2); var connected = data.ctp_status && data.ctp_status.connected; @@ -168,6 +183,16 @@ ctpConnecting = !!connecting; isTradingSession = !!data.trading_session; syncCtpBadgeFromStatus(data.ctp_status || { connected: connected, connecting: connecting }); + if (syncBadge) { + if (data.sync_label && connected) { + syncBadge.hidden = false; + syncBadge.textContent = data.sync_label; + syncBadge.className = 'sync-badge ' + (data.sync_state === 'syncing' ? 'text-accent' : 'text-muted'); + } else { + syncBadge.hidden = true; + syncBadge.textContent = ''; + } + } if (!connected && !connecting && data.ctp_status && data.ctp_status.last_error) { showCtpError(data.ctp_status.last_error); if (isCtpLoginBanError(data.ctp_status.last_error)) { @@ -181,10 +206,14 @@ riskBadge.textContent = data.risk_status.status_label || ''; riskBadge.className = 'badge ' + (data.risk_status.can_trade ? 'profit' : 'loss'); } - var rows = data.rows || []; + applyActiveOrders(data.active_orders || []); + if (!list) return; + var rows = (data.rows || []).filter(function (row) { + return row.order_state !== 'pending'; + }); var seenKeys = {}; rows = rows.filter(function (row) { - var k = row.key || ((row.symbol_code || '') + ':' + (row.direction || '')); + var k = row.key || row.position_key || ((row.symbol_code || '') + ':' + (row.direction || '')); if (seenKeys[k]) return false; seenKeys[k] = true; return true; @@ -210,36 +239,7 @@ tryAutoCtpReconnect(); return; } - var pendingOnly = data.pending_orders || []; - if (pendingOnly.length) { - list.innerHTML = '
暂无持仓
' + - pendingOnly.map(function (p) { - var cancelAllowed = p.cancel_allowed !== false && isTradingSession; - var actionBtn = ''; - if (p.monitor_id) { - actionBtn = ''; - } else if (p.order_id && p.source === 'ctp') { - actionBtn = ''; - } - return ( - '
' + (p.label || '挂单') + ' · ' + (p.symbol || p.symbol_code) + '' + - '' + fmtNum(p.price) + ' · ' + - (p.lots || 1) + ' 手' + actionBtn + '
' - ); - }).join(''); - bindPendingDismiss(list); - bindCancelOrderButtons(list); - } else { - list.innerHTML = '
暂无持仓。
'; - } + list.innerHTML = '
暂无持仓。
'; return; } if (!connected) { @@ -839,22 +839,32 @@ ? row.pending_timeout_min : (row.auto_cancel_sec != null ? Math.max(1, Math.ceil(row.auto_cancel_sec / 60)) : 5); var cancelAllowed = row.cancel_allowed !== false && isTradingSession; - var cancelBtn = row.can_cancel_order ? - '' : ''; + var cancelBtn = ''; + if (row.can_cancel_order) { + if (row.monitor_id) { + cancelBtn = ''; + } else if (row.order_id || row.vt_order_id) { + cancelBtn = ''; + } + } + var pendingLabel = row.source_label || '挂单中'; + var isCloseOrder = pendingLabel.indexOf('平仓') >= 0; var metaLine = - '状态 挂单中' + + '状态 ' + pendingLabel + '' + ' · 委托价 ' + fmtNum(orderPx) + '' + (row.rr_ratio != null ? ' · 盈亏比 ' + row.rr_ratio + ':1' : '') + - ' · ' + slTpStatusHtml(row) + - ' · 移动保本 ' + trailingStatusHtml(row) + - ' · 约 ' + remainMin + ' 分钟内未成交自动撤单'; + (row.stop_loss != null || row.take_profit != null ? ' · ' + slTpStatusHtml(row) : '') + + (row.trailing_be ? ' · 移动保本 ' + trailingStatusHtml(row) : '') + + (!isCloseOrder ? ' · 约 ' + remainMin + ' 分钟内未成交自动撤单' : ''); return ( '
' + '
' + posSymbolTitleHtml(row, ' ' + dirBadge + '' + - ' 挂单中') + '
' + + ' ' + (isCloseOrder ? '平仓委托' : '挂单中') + '') + '
' + '
' + posSymbolSubHtml(row) + '
' + '
' + cancelBtn + '
' + '
' + metaLine + '
' + diff --git a/templates/trade.html b/templates/trade.html index 5f8b5fc..484a8b0 100644 --- a/templates/trade.html +++ b/templates/trade.html @@ -105,11 +105,19 @@ +
+

当前委托

+

委托以 CTP 柜台为准;未成交可撤单,超时自动撤开仓单。

+
+
加载委托…
+
+
+

当前持仓

-

开仓委托先显示「挂单中」,柜台成交后写入监控;超过 {{ pending_order_timeout_min }} 分钟未成交自动撤单,可手动撤单。

+

持仓以 CTP 柜台为准;止盈止损为程序本地监控,触发后市价平仓。

-
加载本地持仓…
+
加载持仓…
diff --git a/vnpy_bridge.py b/vnpy_bridge.py index 38cb5ac..d794f2a 100644 --- a/vnpy_bridge.py +++ b/vnpy_bridge.py @@ -73,6 +73,7 @@ def _load_persisted_last_error() -> str: return (get_setting(CTP_LAST_ERROR_KEY, "") or "").strip() _position_refresh_callback: Optional[Callable[[], None]] = None +_tick_sl_tp_callback: Optional[Callable[[str, str, float], None]] = None _position_refresh_debounce_lock = threading.Lock() _position_refresh_debounce_ts: float = 0.0 @@ -82,6 +83,12 @@ def set_position_refresh_callback(fn: Optional[Callable[[], None]]) -> None: _position_refresh_callback = fn +def set_tick_sl_tp_callback(fn: Optional[Callable[[str, str, float], None]]) -> None: + """注册 tick 回调:exchange, symbol, last_price → 本地 SL/TP 触发。""" + global _tick_sl_tp_callback + _tick_sl_tp_callback = fn + + def _fire_position_refresh_callback() -> None: fn = _position_refresh_callback if not fn: @@ -223,6 +230,8 @@ class CtpBridge: self._last_connect_ok_ts: float = 0.0 self._tick_hooked = False self._position_hooked = False + self._order_hooked = False + self._trade_hooked = False self._bar_generators: dict[str, Any] = {} self._bars_1m: dict[str, deque] = {} self._init_engine() @@ -238,6 +247,8 @@ class CtpBridge: self._engine = MainEngine(self._ee) self._engine.add_gateway(CtpGateway) self._ensure_position_event_hook() + self._ensure_order_event_hook() + self._ensure_trade_event_hook() except ImportError: self._last_error = "未安装 vnpy / vnpy_ctp,请 pip install vnpy vnpy_ctp" except Exception as exc: @@ -253,17 +264,29 @@ class CtpBridge: def _on_position(event) -> None: try: + from ctp_trading_state import trading_state + pos = event.data - vol = int(getattr(pos, "volume", 0) or 0) - if vol <= 0: - return + row = self._position_row_from_vnpy(pos) + if row: + trading_state.upsert_position(row, notify=False) sym = getattr(pos, "symbol", "") or "" d = "long" if _is_long_direction(getattr(pos, "direction", None)) else "short" - for attr in ("margin", "use_margin", "UseMargin"): - raw = float(getattr(pos, attr, 0) or 0) - if raw > 0: - self._position_margins[self._position_margin_key(sym, d)] = raw - break + vol = int(getattr(pos, "volume", 0) or 0) + if vol <= 0: + exchange = getattr(pos, "exchange", None) + ex_name = str(exchange.value if hasattr(exchange, "value") else exchange or "") + from ctp_trading_state import position_key + + trading_state.remove_position( + position_key(ex_name, sym, d), notify=False, + ) + else: + for attr in ("margin", "use_margin", "UseMargin"): + raw = float(getattr(pos, attr, 0) or 0) + if raw > 0: + self._position_margins[self._position_margin_key(sym, d)] = raw + break except Exception as exc: logger.debug("position margin cache: %s", exc) _fire_position_refresh_callback_debounced() @@ -271,6 +294,137 @@ class CtpBridge: self._ee.register(EVENT_POSITION, _on_position) self._position_hooked = True + def _ensure_order_event_hook(self) -> None: + if self._order_hooked or not self._ee: + return + try: + from vnpy.trader.event import EVENT_ORDER + except ImportError: + return + + def _on_order(event) -> None: + try: + from ctp_trading_state import trading_state + + order = event.data + row = self._order_row_from_vnpy(order) + if not row: + return + status_s = str(row.get("status") or "") + terminal = any( + x in status_s + for x in ("ALLTRADED", "CANCELLED", "REJECTED", "全部成交", "已撤销", "拒单") + ) + oid = str(row.get("order_id") or row.get("vt_order_id") or "") + if terminal or int(row.get("lots") or 0) <= 0: + trading_state.remove_order(oid, notify=False) + else: + trading_state.upsert_order(row, notify=False) + except Exception as exc: + logger.debug("order event: %s", exc) + _fire_position_refresh_callback_debounced(min_interval=0.2) + + self._ee.register(EVENT_ORDER, _on_order) + self._order_hooked = True + + def _ensure_trade_event_hook(self) -> None: + if self._trade_hooked or not self._ee: + return + try: + from vnpy.trader.event import EVENT_TRADE + except ImportError: + return + + def _on_trade(event) -> None: + try: + trade = event.data + row = self._trade_row_from_vnpy(trade) + if row and row.get("offset") == "open": + sym = row.get("symbol") or "" + pd = row.get("position_direction") or "long" + dt = row.get("datetime") or "" + if sym and dt: + self._position_open_times[self._position_margin_key(sym, pd)] = dt + except Exception as exc: + logger.debug("trade event: %s", exc) + _fire_position_refresh_callback_debounced(min_interval=0.2) + + self._ee.register(EVENT_TRADE, _on_trade) + self._trade_hooked = True + + def _order_row_from_vnpy(self, order: Any) -> Optional[dict[str, Any]]: + try: + status = getattr(order, "status", None) + status_s = str(status) + vol = int(getattr(order, "volume", 0) or 0) + traded = int(getattr(order, "traded", 0) or 0) + remain = max(0, vol - traded) + direction = getattr(order, "direction", None) + d = "long" + if direction is not None and str(direction).endswith("SHORT"): + d = "short" + offset = getattr(order, "offset", None) + sym = getattr(order, "symbol", "") or "" + exchange = getattr(order, "exchange", None) + ex_name = str(exchange.value if hasattr(exchange, "value") else exchange or "") + vt_oid = str(getattr(order, "vt_orderid", "") or "") + order_id = str(getattr(order, "orderid", "") or "") + return { + "symbol": sym, + "exchange": ex_name, + "direction": d, + "lots": remain, + "price": float(getattr(order, "price", 0) or 0), + "offset": str(offset or ""), + "order_id": vt_oid or order_id, + "vt_order_id": vt_oid, + "status": status_s, + } + except Exception as exc: + logger.debug("order_row_from_vnpy: %s", exc) + return None + + def _position_row_from_vnpy(self, pos: Any) -> Optional[dict[str, Any]]: + try: + vol = int(getattr(pos, "volume", 0) or 0) + d = "long" if _is_long_direction(getattr(pos, "direction", None)) else "short" + sym = getattr(pos, "symbol", "") or "" + exchange = getattr(pos, "exchange", None) + ex_name = str(exchange.value if hasattr(exchange, "value") else exchange or "") + price = float(getattr(pos, "price", 0) or 0) + yd = int(getattr(pos, "yd_volume", 0) or 0) + td = max(0, vol - yd) + margin = self.estimate_position_margin(sym, ex_name, d, vol, price, pos=pos) + open_time = self._lookup_position_open_time(sym, d) or None + return { + "symbol": sym, + "exchange": ex_name, + "direction": d, + "lots": vol, + "avg_price": price, + "pnl": float(getattr(pos, "pnl", 0) or 0), + "frozen": int(getattr(pos, "frozen", 0) or 0), + "margin": margin, + "open_time": open_time, + "yd_volume": yd, + "td_volume": td, + } + except Exception as exc: + logger.debug("position_row_from_vnpy: %s", exc) + return None + + def calibrate_trading_state(self) -> None: + """全量校准内存簿(读 vnpy 缓存,不 query 柜台)。""" + try: + from ctp_trading_state import trading_state + + with _ctp_td_lock: + orders = self.list_active_orders() + positions = self._collect_positions() + trading_state.calibrate_from_lists(orders, positions) + except Exception as exc: + logger.debug("calibrate trading state: %s", exc) + def available(self) -> bool: return self._engine is not None @@ -336,6 +490,12 @@ class CtpBridge: except Exception as exc: logger.debug("gateway close: %s", exc) self._connected_mode = None + try: + from ctp_trading_state import trading_state + + trading_state.clear() + except Exception: + pass time.sleep(0.6) def _login_rejected(self, ctp_logs: list[str]) -> bool: @@ -463,6 +623,10 @@ class CtpBridge: mode, self._td_logged_in(), len(self._engine.get_all_accounts() or [])) self._schedule_fee_sync(mode) + try: + self.calibrate_trading_state() + except Exception as exc: + logger.debug("post-connect calibrate: %s", exc) _fire_position_refresh_burst() return finally: @@ -844,6 +1008,20 @@ class CtpBridge: sym = (getattr(tick, "symbol", "") or "").lower() te = getattr(tick, "exchange", None) ex_s = str(te.value if hasattr(te, "value") else te or "").upper() + price = self._price_from_tick(tick) + if price and price > 0: + try: + from ctp_trading_state import trading_state + + trading_state.set_tick_price(ex_s, sym, price) + except Exception: + pass + fn = _tick_sl_tp_callback + if fn: + try: + fn(ex_s, sym, float(price)) + except Exception as exc: + logger.debug("tick sl/tp callback: %s", exc) key = self._tick_key(sym, ex_s) bg = self._bar_generators.get(key) if not bg: @@ -1329,6 +1507,8 @@ class CtpBridge: sym = getattr(order, "symbol", "") or "" exchange = getattr(order, "exchange", None) ex_name = str(exchange.value if hasattr(exchange, "value") else exchange or "") + vt_oid = str(getattr(order, "vt_orderid", "") or "") + order_id = str(getattr(order, "orderid", "") or "") out.append({ "symbol": sym, "exchange": ex_name, @@ -1336,7 +1516,8 @@ class CtpBridge: "lots": remain, "price": float(getattr(order, "price", 0) or 0), "offset": offset_s, - "order_id": str(getattr(order, "orderid", "") or ""), + "order_id": vt_oid or order_id, + "vt_order_id": vt_oid, "status": status_s, }) return out