Add stats trading calendar and fix CTP position avg/sync.

Calendar shows daily closed trade count and PnL with emotion-day highlighting; day click loads review-first trade list. Use exchange-only entry average and improve vnpy position sync after CTP reconnect.
This commit is contained in:
dekun
2026-06-30 11:59:25 +08:00
parent d07fc4b70d
commit 8ebad6e8a2
8 changed files with 926 additions and 198 deletions
+167 -12
View File
@@ -163,13 +163,61 @@ def _fire_position_refresh_callback_debounced(*, min_interval: float = 0.35) ->
def _fire_position_refresh_burst() -> None:
"""连接后持仓回报可能分批到达,分多次触发快照刷新。"""
_fire_position_refresh_callback()
for delay in (1.5, 4.0, 10.0, 25.0):
for delay in (1.5, 4.0, 10.0, 18.0):
threading.Timer(delay, _fire_position_refresh_callback).start()
def _schedule_after_instruments_ready(bridge: "CtpBridge") -> None:
"""合约查询结束后查询持仓并校准(SimNow 登录后约 10–20s)。"""
if not getattr(bridge, "_connected_mode", None):
return
now = time.monotonic()
if now - float(getattr(bridge, "_last_instruments_ready_ts", 0) or 0) < 5.0:
return
bridge._last_instruments_ready_ts = now
def _run() -> None:
try:
if bridge._has_live_positions():
return
bridge._ensure_instrument_margin_hooks()
with _ctp_td_lock:
bridge.request_position_snapshot(force=True)
time.sleep(2.0)
with _ctp_td_lock:
bridge.calibrate_trading_state()
_fire_position_refresh_callback()
n = len(bridge._collect_positions())
logger.info("CTP 合约加载完成,持仓 %s 条,已刷新快照", n)
except Exception as exc:
logger.debug("instruments ready refresh: %s", exc)
threading.Timer(0.4, _run).start()
def _schedule_position_query_retries(bridge: "CtpBridge") -> None:
def _run() -> None:
if not bridge._connected_mode or bridge._has_live_positions():
return
try:
bridge._ensure_instrument_margin_hooks()
with _ctp_td_lock:
bridge.request_position_snapshot(force=False)
time.sleep(1.0)
with _ctp_td_lock:
bridge.calibrate_trading_state()
_fire_position_refresh_callback()
except Exception as exc:
logger.debug("position query retry: %s", exc)
for delay in POSITION_QUERY_RETRY_DELAYS_SEC:
threading.Timer(delay, _run).start()
_bridge: Optional["CtpBridge"] = None
_bridge_lock = threading.Lock()
_ctp_td_lock = threading.RLock()
POSITION_QUERY_MIN_INTERVAL_SEC = 5.0
POSITION_QUERY_RETRY_DELAYS_SEC = (22.0, 50.0, 95.0)
TRADE_QUERY_MIN_INTERVAL_SEC = 10.0
@@ -273,6 +321,10 @@ class CtpBridge:
self._margin_rate_lists: dict[int, list] = {}
self._margin_rate_hooked = False
self._instrument_hooked = False
self._hooks_td_api_id: Optional[int] = None
self._ctp_log_hooked = False
self._last_instruments_ready_ts: float = 0.0
self._last_position_rsp_ts: float = 0.0
self._instrument_margin_ratios: dict[str, dict[str, float]] = {}
self._margin_per_lot: dict[str, float] = {}
self._subscribed: set[str] = set()
@@ -306,6 +358,7 @@ class CtpBridge:
self._ensure_position_event_hook()
self._ensure_order_event_hook()
self._ensure_trade_event_hook()
self._ensure_ctp_log_hooks()
except ImportError:
self._last_error = "未安装 vnpy / vnpy_ctp,请 pip install vnpy vnpy_ctp"
except Exception as exc:
@@ -479,18 +532,13 @@ class CtpBridge:
"td_volume": td,
}
try:
from ctp_entry_price import entry_from_ctp_pnl, round_to_tick
from ctp_trading_state import trading_state
from ctp_entry_price import round_to_tick
ths = CtpBridge._vnpy_sym_to_ths(sym, ex_name) or sym
tick = trading_state.get_tick_price(ex_name, sym)
pnl_entry = entry_from_ctp_pnl(row, tick, ths_sym=ths)
if pnl_entry and price > 0 and abs(pnl_entry - price) >= 0.5:
row["avg_price"] = pnl_entry
elif price > 0:
if price > 0:
row["avg_price"] = round_to_tick(price, ths)
except Exception as exc:
logger.debug("position avg refine: %s", exc)
logger.debug("position avg round: %s", exc)
return row
except Exception as exc:
logger.debug("position_row_from_vnpy: %s", exc)
@@ -579,6 +627,11 @@ class CtpBridge:
except Exception as exc:
logger.debug("gateway close: %s", exc)
self._connected_mode = None
self._hooks_td_api_id = None
self._instrument_hooked = False
self._margin_rate_hooked = False
self._last_position_query_ts = 0.0
self._last_instruments_ready_ts = 0.0
try:
from ctp_trading_state import trading_state
@@ -587,6 +640,27 @@ class CtpBridge:
pass
time.sleep(0.6)
def _ensure_ctp_log_hooks(self) -> None:
"""监听 vnpy 日志:合约查询成功时补触发持仓刷新(重连后 td_api 可能已换)。"""
if self._ctp_log_hooked or not self._ee:
return
try:
from vnpy.trader.event import EVENT_LOG
except ImportError:
return
bridge = self
def _on_persistent_log(event) -> None:
try:
msg = getattr(event.data, "msg", "") or str(event.data)
if "合约信息查询成功" in str(msg):
_schedule_after_instruments_ready(bridge)
except Exception as exc:
logger.debug("ctp log hook: %s", exc)
self._ee.register(EVENT_LOG, _on_persistent_log)
self._ctp_log_hooked = True
def _login_rejected(self, ctp_logs: list[str]) -> bool:
return any(
kw in m
@@ -723,7 +797,9 @@ class CtpBridge:
self.calibrate_trading_state()
except Exception as exc:
logger.debug("post-connect calibrate: %s", exc)
self._ensure_instrument_margin_hooks()
_fire_position_refresh_burst()
_schedule_position_query_retries(self)
_fire_ctp_connected_callback(mode)
return
finally:
@@ -1040,7 +1116,7 @@ class CtpBridge:
self._instrument_margin_ratios[key] = ratios
def _ensure_instrument_margin_hooks(self) -> None:
"""登录前挂钩:合约查询回报缓存保证金率;支持按需 reqQryInstrumentMarginRate"""
"""登录前挂钩:合约/持仓查询回报td_api 重建后须重新挂钩"""
if not self._engine:
return
try:
@@ -1049,9 +1125,14 @@ class CtpBridge:
except Exception:
return
bridge = self
td_id = id(td)
if td_id != self._hooks_td_api_id:
self._hooks_td_api_id = td_id
self._instrument_hooked = False
self._margin_rate_hooked = False
if not self._instrument_hooked:
orig = td.onRspQryInstrument
orig_inst = td.onRspQryInstrument
def on_instrument(data: dict, error: dict, reqid: int, last: bool) -> None:
try:
@@ -1059,9 +1140,37 @@ class CtpBridge:
bridge._cache_margin_ratio(str(data["InstrumentID"]), data)
except Exception as exc:
logger.debug("instrument margin cache: %s", exc)
return orig(data, error, reqid, last)
if last:
_schedule_after_instruments_ready(bridge)
return orig_inst(data, error, reqid, last)
td.onRspQryInstrument = on_instrument # type: ignore[method-assign]
orig_pos = td.onRspQryInvestorPosition
def on_rsp_position(
data: dict, error: dict, reqid: int, last: bool,
) -> None:
ret = orig_pos(data, error, reqid, last)
if last:
now = time.monotonic()
if now - bridge._last_position_rsp_ts < 30.0:
return ret
bridge._last_position_rsp_ts = now
def _after_position_query() -> None:
try:
time.sleep(1.5)
with _ctp_td_lock:
bridge.calibrate_trading_state()
_fire_position_refresh_callback()
except Exception as exc:
logger.debug("position rsp refresh: %s", exc)
threading.Timer(0.2, _after_position_query).start()
return ret
td.onRspQryInvestorPosition = on_rsp_position # type: ignore[method-assign]
self._instrument_hooked = True
if self._margin_rate_hooked:
@@ -1679,6 +1788,52 @@ class CtpBridge:
"""vnpy 内存缓存持仓;禁止 query_positionvnctptd 并发查询会段错误)。"""
return
def _has_live_positions(self) -> bool:
if not self._engine:
return False
try:
with _ctp_td_lock:
return len(self._collect_positions()) > 0
except Exception:
return False
def request_position_snapshot(self, *, force: bool = False) -> None:
"""合约加载后查询持仓,填充 vnpy 内存(已有持仓时跳过主动查询)。"""
if not self._engine or not self._connected_mode:
return
if not force and self._has_live_positions():
return
now = time.monotonic()
if not force and (now - self._last_position_query_ts) < POSITION_QUERY_MIN_INTERVAL_SEC:
return
try:
self._ensure_instrument_margin_hooks()
gw = self._engine.get_gateway(GATEWAY_NAME)
td = getattr(gw, "td_api", None) if gw else None
if not td or not getattr(td, "login_status", False):
logger.debug("CTP 持仓查询跳过:交易未登录")
return
if hasattr(td, "reqQryInvestorPosition"):
reqid = int(getattr(td, "reqid", 0)) + 1
td.reqid = reqid
req = {
"BrokerID": getattr(td, "brokerid", ""),
"InvestorID": getattr(td, "userid", ""),
}
with _ctp_td_lock:
ret = td.reqQryInvestorPosition(req, reqid)
if ret == 0:
self._last_position_query_ts = now
logger.info("CTP 已请求持仓查询 reqid=%s", reqid)
else:
logger.debug("CTP 持仓查询发送失败 ret=%s", ret)
elif gw and hasattr(gw, "query_position"):
gw.query_position()
self._last_position_query_ts = now
logger.info("CTP 已请求持仓查询(gateway)")
except Exception as exc:
logger.debug("request_position_snapshot: %s", exc)
def list_positions(self, *, refresh_if_empty: bool = True, refresh_margin: bool = False) -> list[dict[str, Any]]:
del refresh_if_empty, refresh_margin
with _ctp_td_lock: