Stream real-time position quotes via tick-driven SSE with incremental UI updates.
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
+35
-3
@@ -75,9 +75,13 @@ def _load_persisted_last_error() -> str:
|
||||
|
||||
_position_refresh_callback: Optional[Callable[[], None]] = None
|
||||
_tick_sl_tp_callback: Optional[Callable[[str, str, float], None]] = None
|
||||
_tick_quote_callback: Optional[Callable[[], None]] = None
|
||||
_ctp_connected_callback: Optional[Callable[[str], None]] = None
|
||||
_position_refresh_debounce_lock = threading.Lock()
|
||||
_position_refresh_debounce_ts: float = 0.0
|
||||
_tick_quote_timer: Optional[threading.Timer] = None
|
||||
_tick_quote_timer_lock = threading.Lock()
|
||||
TICK_QUOTE_DEBOUNCE_SEC = 0.12
|
||||
|
||||
|
||||
def set_position_refresh_callback(fn: Optional[Callable[[], None]]) -> None:
|
||||
@@ -91,6 +95,33 @@ def set_tick_sl_tp_callback(fn: Optional[Callable[[str, str, float], None]]) ->
|
||||
_tick_sl_tp_callback = fn
|
||||
|
||||
|
||||
def set_tick_quote_callback(fn: Optional[Callable[[], None]]) -> None:
|
||||
"""注册 tick 回调:推送持仓现价/浮盈(由 bridge 侧防抖)。"""
|
||||
global _tick_quote_callback
|
||||
_tick_quote_callback = fn
|
||||
|
||||
|
||||
def _fire_tick_quote_callback_debounced() -> None:
|
||||
"""持仓品种 tick 后 trailing 防抖,批量推送现价/浮盈。"""
|
||||
global _tick_quote_timer
|
||||
|
||||
def _run() -> None:
|
||||
fn = _tick_quote_callback
|
||||
if not fn:
|
||||
return
|
||||
try:
|
||||
fn()
|
||||
except Exception as exc:
|
||||
logger.debug("tick quote callback: %s", exc)
|
||||
|
||||
with _tick_quote_timer_lock:
|
||||
if _tick_quote_timer is not None:
|
||||
_tick_quote_timer.cancel()
|
||||
_tick_quote_timer = threading.Timer(TICK_QUOTE_DEBOUNCE_SEC, _run)
|
||||
_tick_quote_timer.daemon = True
|
||||
_tick_quote_timer.start()
|
||||
|
||||
|
||||
def set_ctp_connected_callback(fn: Optional[Callable[[str], None]]) -> None:
|
||||
"""CTP 交易通道登录成功后回调(mode=simulation|live)。"""
|
||||
global _ctp_connected_callback
|
||||
@@ -319,7 +350,7 @@ class CtpBridge:
|
||||
break
|
||||
except Exception as exc:
|
||||
logger.debug("position margin cache: %s", exc)
|
||||
_fire_position_refresh_callback_debounced()
|
||||
_fire_position_refresh_callback()
|
||||
|
||||
self._ee.register(EVENT_POSITION, _on_position)
|
||||
self._position_hooked = True
|
||||
@@ -352,7 +383,7 @@ class CtpBridge:
|
||||
trading_state.upsert_order(row, notify=False)
|
||||
except Exception as exc:
|
||||
logger.debug("order event: %s", exc)
|
||||
_fire_position_refresh_callback_debounced(min_interval=0.2)
|
||||
_fire_position_refresh_callback()
|
||||
|
||||
self._ee.register(EVENT_ORDER, _on_order)
|
||||
self._order_hooked = True
|
||||
@@ -377,7 +408,7 @@ class CtpBridge:
|
||||
self._position_open_times[self._position_margin_key(sym, pd)] = dt
|
||||
except Exception as exc:
|
||||
logger.debug("trade event: %s", exc)
|
||||
_fire_position_refresh_callback_debounced(min_interval=0.2)
|
||||
_fire_position_refresh_callback()
|
||||
|
||||
self._ee.register(EVENT_TRADE, _on_trade)
|
||||
self._trade_hooked = True
|
||||
@@ -1256,6 +1287,7 @@ class CtpBridge:
|
||||
fn(ex_s, sym, float(price))
|
||||
except Exception as exc:
|
||||
logger.debug("tick sl/tp callback: %s", exc)
|
||||
_fire_tick_quote_callback_debounced()
|
||||
key = self._tick_key(sym, ex_s)
|
||||
bg = self._bar_generators.get(key)
|
||||
if not bg:
|
||||
|
||||
Reference in New Issue
Block a user