Files
qihuo/vnpy_bridge.py
T
dekun 9f48f22d16 Gate order cancel to trading hours and sync trade logs from CTP.
Disable cancel UI outside sessions, query exchange fills for records, and label local vs counterparty rows.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-26 00:35:51 +08:00

1561 lines
59 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""CTP 执行层:模拟盘 → SimNow;实盘 → 期货公司(vnpy_ctp)。"""
from __future__ import annotations
import logging
import os
import threading
import time
from collections import deque
from typing import Any, Callable, Optional
from locale_fix import ensure_process_locale
ensure_process_locale()
from ctp_settings import live_setting_dict, simnow_setting_dict
from ctp_symbol import ths_to_vnpy_symbol, to_vnpy_exchange
from contract_specs import get_contract_spec
logger = logging.getLogger(__name__)
GATEWAY_NAME = "CTP"
CONNECT_WAIT_SEC = 60
CONNECT_POLL_INTERVAL_SEC = 0.5
LOGIN_BAN_COOLDOWN_SEC = 45 * 60
LOGIN_FAIL_COOLDOWN_SEC = 5 * 60
CTP_COOLDOWN_UNTIL_KEY = "ctp_login_cooldown_until"
CTP_LAST_ERROR_KEY = "ctp_last_error"
def _persist_login_cooldown(seconds: float) -> None:
from fee_specs import get_setting, set_setting
new_until = time.time() + max(0.0, seconds)
try:
old = float(get_setting(CTP_COOLDOWN_UNTIL_KEY, "0") or 0)
except (TypeError, ValueError):
old = 0.0
if new_until > old:
set_setting(CTP_COOLDOWN_UNTIL_KEY, str(new_until))
def _persisted_login_cooldown_remaining() -> int:
from fee_specs import get_setting
try:
until = float(get_setting(CTP_COOLDOWN_UNTIL_KEY, "0") or 0)
return max(0, int(until - time.time()))
except (TypeError, ValueError):
return 0
def _clear_persisted_login_cooldown() -> None:
from fee_specs import set_setting
set_setting(CTP_COOLDOWN_UNTIL_KEY, "0")
def _persist_last_error(msg: str) -> None:
from fee_specs import set_setting
set_setting(CTP_LAST_ERROR_KEY, (msg or "").strip())
def _load_persisted_last_error() -> str:
from fee_specs import get_setting
return (get_setting(CTP_LAST_ERROR_KEY, "") or "").strip()
_position_refresh_callback: Optional[Callable[[], None]] = None
def set_position_refresh_callback(fn: Optional[Callable[[], None]]) -> None:
global _position_refresh_callback
_position_refresh_callback = fn
def _fire_position_refresh_callback() -> None:
fn = _position_refresh_callback
if not fn:
return
try:
threading.Thread(target=fn, daemon=True, name="ctp-position-refresh").start()
except Exception as exc:
logger.debug("position refresh callback: %s", exc)
_bridge: Optional["CtpBridge"] = None
_bridge_lock = threading.Lock()
def _simnow_setting() -> dict[str, str]:
"""SimNow 仿真前置(系统设置优先,.env 兜底)。"""
return simnow_setting_dict()
def _live_setting() -> dict[str, str]:
return live_setting_dict()
def _setting_for_mode(mode: str) -> dict[str, str]:
return _simnow_setting() if mode == "simulation" else _live_setting()
def _mode_label(mode: str) -> str:
return "SimNow" if mode == "simulation" else "期货公司实盘"
def _parse_tcp_address(address: str) -> tuple[str, int]:
raw = (address or "").strip()
if raw.startswith("tcp://"):
raw = raw[6:]
if ":" not in raw:
raise ValueError(f"无效 TCP 地址: {address}")
host, port_s = raw.rsplit(":", 1)
return host, int(port_s)
def probe_tcp_address(address: str, timeout: float = 5.0) -> tuple[bool, str]:
"""探测 CTP 前置 TCP 是否可达。"""
import socket
try:
host, port = _parse_tcp_address(address)
with socket.create_connection((host, port), timeout=timeout):
return True, ""
except Exception as exc:
return False, str(exc)
def _format_ctp_failure(ctp_logs: list[str], *, td_address: str = "") -> str:
"""根据 CTP 网关日志拼出可读错误。"""
if td_address:
ok, err = probe_tcp_address(td_address, timeout=4.0)
if not ok:
return (
f"SimNow 交易前置不可达:{td_address}{err})。"
"182.254.243.31 已停用,请改 .env 为官方前置 "
"tcp://180.168.146.187:10201 / 10211,并确认服务器能访问该地址。"
)
text = "\n".join(ctp_logs)
if "连续登录失败" in text or "登录被禁止" in text or "代码:75" in text:
return (
"CTP 登录被临时禁止:连续失败次数过多(错误码 75)。"
"请等待约 30~60 分钟后再试,先用快期确认投资者代码与密码正确,期间勿反复点「连接」。"
)
if "4097" in text or "Decrypt handshake" in text or "shake hand" in text.lower():
return (
"CTP 握手失败(4097)vnpy_ctp 与 SimNow 前置加密不匹配。"
"请执行 pip install -U vnpy vnpy_ctp 后重启,并确认 .env 中 SIMNOW_ENV=实盘"
)
if "不合法的登录" in text or "密码" in text or "账号" in text:
tail = ctp_logs[-1] if ctp_logs else ""
return f"CTP 登录被拒:{tail or '请检查投资者代码与密码(快期能否登录)'}"
if "连接断开" in text or "disconnect" in text.lower():
tail = ctp_logs[-1] if ctp_logs else ""
return f"CTP 连接断开:{tail or '请检查前置地址与网络'}"
if ctp_logs:
return f"CTP 连接失败:{ctp_logs[-1]}"
return "CTP 连接超时:未收到柜台回报。请检查 SimNow 账号、前置地址、网络(nc 测端口),并用快期验证账号"
def round_to_tick(price: float, tick: float) -> float:
if tick <= 0:
return float(price)
steps = round(float(price) / tick)
return round(steps * tick, 10)
def _is_long_direction(direction_obj: Any) -> bool:
s = str(direction_obj or "")
return "LONG" in s.upper() or "" in s
class CtpBridge:
def __init__(self) -> None:
self._engine = None
self._ee = None
self._connected_mode: Optional[str] = None
self._last_error: str = ""
self._connect_lock = threading.Lock()
self._connect_in_progress = False
self._login_cooldown_until: float = 0.0
self._restore_persisted_state()
self._commission_waiters: dict[int, threading.Event] = {}
self._commission_lists: dict[int, list] = {}
self._commission_hooked = False
self._subscribed: set[str] = set()
self._last_position_query_ts: float = 0.0
self._position_margins: dict[str, float] = {}
self._position_open_times: dict[str, str] = {}
self._margin_hooked = False
self._trade_hooked = False
self._trade_query_results: list[dict[str, Any]] = []
self._trade_query_event = threading.Event()
self._last_trade_query_ts: float = 0.0
self._tick_hooked = False
self._bar_generators: dict[str, Any] = {}
self._bars_1m: dict[str, deque] = {}
self._init_engine()
def _init_engine(self) -> None:
ensure_process_locale()
try:
from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy_ctp import CtpGateway
self._ee = EventEngine()
self._engine = MainEngine(self._ee)
self._engine.add_gateway(CtpGateway)
except ImportError:
self._last_error = "未安装 vnpy / vnpy_ctp,请 pip install vnpy vnpy_ctp"
except Exception as exc:
self._last_error = str(exc)
def available(self) -> bool:
return self._engine is not None
@property
def last_error(self) -> str:
return self._last_error
@property
def connected_mode(self) -> Optional[str]:
return self._connected_mode
def connect_in_progress(self) -> bool:
return self._connect_in_progress
def _restore_persisted_state(self) -> None:
err = _load_persisted_last_error()
if err:
self._last_error = err
db_remain = _persisted_login_cooldown_remaining()
if db_remain > 0:
self._login_cooldown_until = time.monotonic() + db_remain
def login_cooldown_remaining(self) -> int:
"""距允许再次登录的剩余秒数(内存 + 数据库,重启后仍有效)。"""
mem = max(0, int(self._login_cooldown_until - time.monotonic()))
return max(mem, _persisted_login_cooldown_remaining())
def _is_login_cooldown_active(self) -> bool:
return self.login_cooldown_remaining() > 0
def _set_login_cooldown(self, seconds: float) -> None:
until = time.monotonic() + max(0.0, seconds)
if until > self._login_cooldown_until:
self._login_cooldown_until = until
_persist_login_cooldown(seconds)
def _clear_login_cooldown(self) -> None:
self._login_cooldown_until = 0.0
_clear_persisted_login_cooldown()
def _apply_login_failure_cooldown(self, ctp_logs: list[str]) -> None:
text = "\n".join(ctp_logs)
if "连续登录失败" in text or "登录被禁止" in text or "代码:75" in text:
self._set_login_cooldown(LOGIN_BAN_COOLDOWN_SEC)
elif any("登录失败" in m or "不合法的登录" in m for m in ctp_logs):
self._set_login_cooldown(LOGIN_FAIL_COOLDOWN_SEC)
def _login_cooldown_message(self) -> str:
remain = self.login_cooldown_remaining()
return (
f"CTP 登录冷却中,请 {remain // 60}{remain % 60} 秒后再试"
f"(避免连续失败被 SimNow 封禁)"
)
def _close_gateway(self) -> None:
"""关闭 CTP 网关,避免半连接状态下重连卡在「连接登录」。"""
if not self._engine:
return
try:
gw = self._engine.get_gateway(GATEWAY_NAME)
if gw:
gw.close()
except Exception as exc:
logger.debug("gateway close: %s", exc)
self._connected_mode = None
time.sleep(0.6)
def _login_rejected(self, ctp_logs: list[str]) -> bool:
return any(
kw in m
for m in ctp_logs
for kw in ("登录失败", "不合法的登录", "登录被禁止", "连续登录失败")
)
def _wait_connected(self, mode: str, ctp_logs: list[str] | None = None) -> bool:
"""等待账户回报或交易通道登录成功。"""
if not self._engine:
return False
logs = ctp_logs or []
loops = max(1, int(CONNECT_WAIT_SEC / CONNECT_POLL_INTERVAL_SEC))
for _ in range(loops):
if self._login_rejected(logs):
return False
try:
if self._engine.get_all_accounts():
return True
except Exception:
pass
if self._td_logged_in():
return True
time.sleep(CONNECT_POLL_INTERVAL_SEC)
return False
def status(self, mode: str) -> dict[str, Any]:
if self._connected_mode == mode:
self.ping()
st = _setting_for_mode(mode)
missing = [k for k in ("用户名", "密码", "交易服务器") if not st.get(k)]
cooldown = self.login_cooldown_remaining()
connecting = bool(self._connect_in_progress and cooldown <= 0)
last_error = self._last_error or _load_persisted_last_error()
return {
"vnpy_installed": self.available(),
"connected": self._connected_mode == mode,
"connecting": connecting,
"connected_mode": self._connected_mode,
"mode_label": _mode_label(mode),
"missing_config": missing,
"last_error": last_error,
"login_cooldown_sec": cooldown,
"broker_id": st.get("经纪商代码", ""),
"td_address": st.get("交易服务器", ""),
}
def connect(self, mode: str, *, force: bool = False) -> None:
if self._connect_in_progress:
raise RuntimeError("CTP 正在连接中,请稍候")
if self._is_login_cooldown_active() and not force:
msg = self._login_cooldown_message()
self._last_error = msg
raise RuntimeError(msg)
if not self._engine:
raise RuntimeError(self._last_error or "vnpy 引擎未初始化")
if self._connected_mode == mode and not force:
if self.ping():
return
self._connected_mode = None
setting = _setting_for_mode(mode)
if not setting.get("用户名") or not setting.get("密码"):
raise ValueError(
f"{_mode_label(mode)}:请在 .env 配置 "
f"{'SIMNOW_USER / SIMNOW_PASSWORD' if mode == 'simulation' else 'CTP_LIVE_USER / CTP_LIVE_PASSWORD'}"
)
if not setting.get("交易服务器"):
raise ValueError(f"{_mode_label(mode)}:未配置交易服务器地址")
self._connect_in_progress = True
try:
with self._connect_lock:
if force and self._connected_mode:
self._close_gateway()
elif self._connected_mode and self._connected_mode != mode:
try:
self._engine.close()
except Exception:
pass
self._connected_mode = None
time.sleep(1)
elif not (self._connected_mode == mode and self.ping()):
self._close_gateway()
ctp_logs: list[str] = []
from vnpy.trader.event import EVENT_LOG
def _on_log(event) -> None:
msg = getattr(event.data, "msg", "") or str(event.data)
if msg:
ctp_logs.append(str(msg))
if len(ctp_logs) > 40:
ctp_logs.pop(0)
logger.info("CTP | %s", msg)
self._ee.register(EVENT_LOG, _on_log)
try:
ensure_process_locale()
logger.info(
"CTP 连接 [%s] user=%s td=%s env=%s",
mode,
setting.get("用户名"),
setting.get("交易服务器"),
setting.get("柜台环境", "实盘"),
)
td_addr = setting.get("交易服务器", "")
ok, err = probe_tcp_address(td_addr, timeout=5.0)
if not ok:
raise RuntimeError(
f"SimNow 交易前置不可达:{td_addr}{err})。"
"请更新 .env 中 SIMNOW_TD_ADDRESS 为官网最新地址,"
"并在服务器执行 nc -zv 验证出网。"
)
self._engine.connect(setting, GATEWAY_NAME)
if self._wait_connected(mode, ctp_logs):
self._connected_mode = mode
self._last_error = ""
_persist_last_error("")
self._clear_login_cooldown()
logger.info("CTP 已连接 [%s] td_login=%s accounts=%s",
mode, self._td_logged_in(),
len(self._engine.get_all_accounts() or []))
self._install_position_margin_hook()
self._schedule_fee_sync(mode)
try:
self.refresh_positions()
except Exception as exc:
logger.debug("initial position query: %s", exc)
_fire_position_refresh_callback()
return
finally:
self._ee.unregister(EVENT_LOG, _on_log)
self._close_gateway()
self._apply_login_failure_cooldown(ctp_logs)
hint = _format_ctp_failure(ctp_logs, td_address=setting.get("交易服务器", ""))
self._last_error = hint
_persist_last_error(hint)
logger.warning("CTP 连接失败 [%s]: %s | logs=%s", mode, hint, ctp_logs[-5:])
raise RuntimeError(hint)
finally:
self._connect_in_progress = False
def start_connect_async(self, mode: str, *, force: bool = False) -> dict[str, Any]:
"""后台连接,不阻塞 HTTP 请求。"""
if self._connected_mode == mode and self.ping() and not force:
return {"started": False, "connecting": False, "connected": True}
if self._connect_in_progress:
return {"started": False, "connecting": True, "connected": False}
if self._is_login_cooldown_active() and not force:
self._last_error = self._login_cooldown_message()
return {
"started": False,
"connecting": False,
"connected": False,
"cooldown": True,
}
def _run() -> None:
try:
self.connect(mode, force=force)
except Exception as exc:
logger.warning("CTP 后台连接失败: %s", exc)
threading.Thread(target=_run, daemon=True, name="ctp-connect-async").start()
return {"started": True, "connecting": True, "connected": False}
def ensure_connected(self, mode: str) -> None:
if self._connected_mode == mode and self.ping():
return
self.connect(mode)
def require_connected(self, mode: str) -> None:
"""报单前检查:须已连接,不在此发起阻塞式 connect。"""
if self._connect_in_progress:
raise RuntimeError("CTP 连接中,请稍候再下单")
if self._connected_mode != mode or not self.ping():
raise RuntimeError("请先连接 CTP(持仓监控页点击「连接 CTP」)")
if not self._td_logged_in():
raise RuntimeError("CTP 交易通道未登录,请重连 CTP 后再下单")
def _td_logged_in(self) -> bool:
try:
gw = self._engine.get_gateway(GATEWAY_NAME)
td = gw.td_api
return bool(getattr(td, "login_status", False))
except Exception:
return False
def _find_position(self, sym: str, ex_name: str, hold_direction: str) -> Any:
if not self._engine:
return None
sym_l = sym.lower()
ex_u = ex_name.upper()
want_long = hold_direction == "long"
try:
for pos in self._engine.get_all_positions():
ps = (getattr(pos, "symbol", "") or "").lower()
pe = getattr(pos, "exchange", None)
pe_s = str(pe.value if hasattr(pe, "value") else pe or "").upper()
if ps != sym_l or pe_s != ex_u:
continue
vol = int(getattr(pos, "volume", 0) or 0)
if vol <= 0:
continue
is_long = _is_long_direction(getattr(pos, "direction", None))
if is_long == want_long:
return pos
except Exception as exc:
logger.debug("find position: %s", exc)
return None
def _resolve_close_offset(self, sym: str, ex_name: str, hold_direction: str, lots: int) -> Any:
from vnpy.trader.constant import Offset
ex_u = (ex_name or "").upper()
# 上期所/能源中心/郑商所/中金所须区分平今/平昨;大商所等可用通用 CLOSE
if ex_u not in ("CZCE", "CFFEX", "SHFE", "INE"):
return Offset.CLOSE
pos = self._find_position(sym, ex_u, hold_direction)
if not pos:
# 找不到持仓明细时,日盘新开仓优先平今(避免 SHFE「平昨仓位不足」)
if ex_u in ("SHFE", "INE", "CZCE"):
return Offset.CLOSETODAY
return Offset.CLOSE
vol = int(getattr(pos, "volume", 0) or 0)
yd = int(getattr(pos, "yd_volume", 0) or 0)
today = max(0, vol - yd)
if today >= lots:
return Offset.CLOSETODAY
return Offset.CLOSEYESTERDAY
def _aggressive_limit_price(
self,
ths_code: str,
sym: str,
ex_name: str,
direction: Any,
tick: float,
fallback: float,
) -> float:
from vnpy.trader.constant import Direction
self.subscribe_symbol(ths_code)
lp = fallback
detail = self.get_tick_detail(ths_code, mode=self._connected_mode or "")
if detail.get("price"):
lp = float(detail["price"])
slip = max(tick, tick * 3)
if direction == Direction.LONG:
lp = lp + slip
else:
lp = max(tick, lp - slip)
return round_to_tick(lp, tick)
def ping(self) -> bool:
"""检测连接是否仍有效;无效则清除 connected 状态。"""
if not self._engine or not self._connected_mode:
return False
try:
if self._engine.get_all_accounts():
return True
except Exception as exc:
logger.debug("CTP ping failed: %s", exc)
self._connected_mode = None
return False
def mark_disconnected(self) -> None:
self._connected_mode = None
def reconnect_after_settings_saved(self, mode: str) -> dict[str, Any]:
"""保存前置/账号后关闭旧连接,并用数据库中的新配置重连。"""
self._close_gateway()
self._last_error = ""
_persist_last_error("")
return self.start_connect_async(mode, force=True)
def _schedule_fee_sync(self, mode: str) -> None:
"""连接成功后触发每日同步检查(非每次全量)。"""
def _run() -> None:
time.sleep(45)
try:
from ctp_fee_worker import try_daily_ctp_fee_sync
def _gs(key: str, default: str = "") -> str:
from fee_specs import get_setting
return get_setting(key, default)
def _ss(key: str, val: str) -> None:
from fee_specs import set_setting
set_setting(key, val)
try_daily_ctp_fee_sync(
mode,
get_setting=_gs,
set_setting=_ss,
force=False,
)
except Exception as exc:
logger.debug("CTP 手续费连接后检查: %s", exc)
threading.Thread(target=_run, daemon=True, name="ctp-fee-sync-check").start()
def _ensure_commission_callback(self) -> None:
if self._commission_hooked or not self._engine:
return
try:
gw = self._engine.get_gateway(GATEWAY_NAME)
td = gw.td_api
except Exception:
return
bridge = self
def on_rsp(data: dict, error: dict, reqid: int, last: bool) -> None:
if error and int(error.get("ErrorID") or 0) != 0:
logger.debug(
"CTP commission error reqid=%s: %s",
reqid,
error.get("ErrorMsg") or error,
)
if data and data.get("InstrumentID"):
bridge._commission_lists.setdefault(reqid, []).append(dict(data))
ev = bridge._commission_waiters.get(reqid)
if last and ev:
ev.set()
td.onRspQryInstrumentCommissionRate = on_rsp # type: ignore[method-assign]
self._commission_hooked = True
def _query_commission(
self,
*,
mode: str,
instrument_id: str = "",
exchange_id: str = "",
timeout: float = 8,
) -> list[dict]:
if self._connected_mode != mode or not self._engine:
return []
try:
gw = self._engine.get_gateway(GATEWAY_NAME)
td = gw.td_api
except Exception as exc:
logger.debug("commission query init: %s", exc)
return []
if not getattr(td, "login_status", False):
return []
if not hasattr(td, "reqQryInstrumentCommissionRate"):
return []
self._ensure_commission_callback()
reqid = int(getattr(td, "reqid", 0)) + 1
td.reqid = reqid
ev = threading.Event()
self._commission_waiters[reqid] = ev
req = {
"BrokerID": td.brokerid,
"InvestorID": td.userid,
"InstrumentID": instrument_id or "",
"ExchangeID": exchange_id or "",
}
ret = td.reqQryInstrumentCommissionRate(req, reqid)
if ret != 0:
self._commission_waiters.pop(reqid, None)
return []
ev.wait(timeout=timeout)
self._commission_waiters.pop(reqid, None)
return self._commission_lists.pop(reqid, [])
def query_instrument_commission(self, ths_code: str, *, mode: str) -> dict:
"""查询单合约 CTP 手续费率(需已连接)。"""
try:
sym, ex_name = ths_to_vnpy_symbol(ths_code)
except Exception:
return {}
rows = self._query_commission(
mode=mode,
instrument_id=sym,
exchange_id=ex_name,
)
return rows[-1] if rows else {}
def query_all_commissions(self, *, mode: str) -> list[dict]:
"""批量查询全部合约手续费(InstrumentID 留空)。"""
return self._query_commission(mode=mode, timeout=45)
def _tick_key(self, symbol: str, ex_name: str) -> str:
return f"{symbol.lower()}:{ex_name.upper()}"
def _price_from_tick(self, tick: Any) -> Optional[float]:
for attr in ("last_price", "bid_price_1", "ask_price_1", "pre_close"):
try:
v = float(getattr(tick, attr, 0) or 0)
except (TypeError, ValueError):
v = 0.0
if v > 0:
return v
return None
def _lookup_tick(self, symbol: str, ex_name: str) -> Optional[float]:
if not self._engine:
return None
sym_l = symbol.lower()
ex_u = ex_name.upper()
try:
for tick in self._engine.get_all_ticks():
ts = (getattr(tick, "symbol", "") or "").lower()
te = getattr(tick, "exchange", None)
te_s = str(te.value if hasattr(te, "value") else te or "").upper()
if ts == sym_l and te_s == ex_u:
p = self._price_from_tick(tick)
if p:
return p
except Exception as exc:
logger.debug("lookup tick: %s", exc)
return None
def _bar_to_dict(self, bar: Any) -> dict:
dt = getattr(bar, "datetime", None)
d_str = dt.strftime("%Y-%m-%d %H:%M:%S") if dt else ""
return {
"d": d_str,
"o": float(getattr(bar, "open_price", 0) or 0),
"h": float(getattr(bar, "high_price", 0) or 0),
"l": float(getattr(bar, "low_price", 0) or 0),
"c": float(getattr(bar, "close_price", 0) or 0),
"v": float(getattr(bar, "volume", 0) or 0),
}
def _ensure_bar_generator(self, sym: str, ex_name: str) -> None:
key = self._tick_key(sym, ex_name)
if key in self._bar_generators:
return
self._bars_1m[key] = deque(maxlen=4000)
def on_bar(bar: Any) -> None:
row = self._bar_to_dict(bar)
if row.get("d"):
self._bars_1m[key].append(row)
try:
from vnpy.trader.utility import BarGenerator
self._bar_generators[key] = BarGenerator(on_bar=on_bar)
except ImportError:
logger.debug("BarGenerator unavailable")
def _find_tick(self, symbol: str, ex_name: str) -> Any:
if not self._engine:
return None
sym_l = symbol.lower()
ex_u = ex_name.upper()
try:
for tick in self._engine.get_all_ticks():
ts = (getattr(tick, "symbol", "") or "").lower()
te = getattr(tick, "exchange", None)
te_s = str(te.value if hasattr(te, "value") else te or "").upper()
if ts == sym_l and te_s == ex_u:
return tick
except Exception as exc:
logger.debug("find tick: %s", exc)
return None
def _tick_to_bar(self, symbol: str, ex_name: str) -> Optional[dict]:
tick = self._find_tick(symbol, ex_name)
if not tick:
return None
lp = self._price_from_tick(tick)
if not lp or lp <= 0:
return None
dt = getattr(tick, "datetime", None)
d_str = dt.strftime("%Y-%m-%d %H:%M:%S") if dt else ""
if not d_str:
from datetime import datetime
from zoneinfo import ZoneInfo
d_str = datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S")
o = float(getattr(tick, "open_price", 0) or lp)
h = float(getattr(tick, "high_price", 0) or lp)
lo = float(getattr(tick, "low_price", 0) or lp)
return {
"d": d_str,
"o": o,
"h": h,
"l": lo,
"c": lp,
"v": float(getattr(tick, "volume", 0) or 0),
}
def _on_tick(self, tick: Any) -> None:
sym = (getattr(tick, "symbol", "") or "").lower()
te = getattr(tick, "exchange", None)
ex_s = str(te.value if hasattr(te, "value") else te or "").upper()
key = self._tick_key(sym, ex_s)
bg = self._bar_generators.get(key)
if not bg:
return
try:
bg.update_tick(tick)
except Exception as exc:
logger.debug("bar gen tick: %s", exc)
def _ensure_tick_handler(self) -> None:
if self._tick_hooked or not self._ee:
return
try:
from vnpy.trader.event import EVENT_TICK
except ImportError:
return
def process_tick(event: Any) -> None:
self._on_tick(event.data)
self._ee.register(EVENT_TICK, process_tick)
self._tick_hooked = True
def get_kline_bars_1m(self, ths_code: str, *, mode: str) -> list[dict]:
"""订阅合约并返回 1 分钟 K 线(含正在形成的 bar)。"""
if self._connected_mode != mode or not self._engine:
return []
try:
sym, ex_name = ths_to_vnpy_symbol(ths_code)
except Exception:
return []
key = self._tick_key(sym, ex_name)
self._ensure_bar_generator(sym, ex_name)
self.subscribe_symbol(ths_code)
for _ in range(12):
if self._bars_1m.get(key) and len(self._bars_1m[key]) > 0:
break
if self._lookup_tick(sym, ex_name):
break
time.sleep(0.2)
bars_1m = list(self._bars_1m.get(key, []))
bg = self._bar_generators.get(key)
if bg and getattr(bg, "bar", None):
forming = self._bar_to_dict(bg.bar)
if forming.get("d"):
if not bars_1m or bars_1m[-1]["d"] != forming["d"]:
bars_1m.append(forming)
else:
bars_1m[-1] = forming
if not bars_1m:
tick_bar = self._tick_to_bar(sym, ex_name)
if tick_bar:
bars_1m = [tick_bar]
return bars_1m
def get_tick_detail(self, ths_code: str, *, mode: str) -> dict[str, Any]:
if self._connected_mode != mode or not self._engine:
return {}
try:
sym, ex_name = ths_to_vnpy_symbol(ths_code)
except Exception:
return {}
self.subscribe_symbol(ths_code)
for _ in range(8):
tick = self._find_tick(sym, ex_name)
if tick:
price = self._price_from_tick(tick)
try:
pre_close = float(getattr(tick, "pre_close", 0) or 0)
except (TypeError, ValueError):
pre_close = 0.0
return {
"price": price,
"pre_close": pre_close if pre_close > 0 else None,
}
time.sleep(0.2)
return {}
def subscribe_symbol(self, ths_code: str) -> None:
if not self._engine or not self._connected_mode:
return
try:
from vnpy.trader.object import SubscribeRequest
sym, ex_name = ths_to_vnpy_symbol(ths_code)
key = self._tick_key(sym, ex_name)
self._ensure_bar_generator(sym, ex_name)
if key in self._subscribed:
return
exchange = to_vnpy_exchange(ex_name)
self._ensure_tick_handler()
req = SubscribeRequest(symbol=sym, exchange=exchange)
self._engine.subscribe(req, GATEWAY_NAME)
self._subscribed.add(key)
except Exception as exc:
logger.debug("CTP subscribe %s: %s", ths_code, exc)
def get_tick_price(self, ths_code: str, *, mode: str) -> Optional[float]:
if self._connected_mode != mode or not self._engine:
return None
try:
sym, ex_name = ths_to_vnpy_symbol(ths_code)
except Exception:
return None
price = self._lookup_tick(sym, ex_name)
if price:
return price
self.subscribe_symbol(ths_code)
for _ in range(8):
time.sleep(0.2)
price = self._lookup_tick(sym, ex_name)
if price:
return price
return None
def get_account(self) -> dict[str, Any]:
if not self._engine:
return {}
accounts = self._engine.get_all_accounts()
if not accounts:
return {}
acc = accounts[0]
return {
"balance": float(getattr(acc, "balance", 0) or 0),
"available": float(getattr(acc, "available", 0) or 0),
"frozen": float(getattr(acc, "frozen", 0) or 0),
"accountid": getattr(acc, "accountid", ""),
}
def _position_margin_key(self, sym: str, direction: str) -> str:
return f"{(sym or '').lower()}:{(direction or 'long').strip().lower()}"
def _lookup_position_open_time(self, sym: str, direction: str) -> str:
return (self._position_open_times.get(self._position_margin_key(sym, direction)) or "").strip()
@staticmethod
def _parse_ctp_open_datetime(date_raw: str, time_raw: str = "") -> str:
"""CTP OpenDate + OpenTime → YYYY-MM-DD HH:MM[:SS]。"""
d = (date_raw or "").strip()
if len(d) >= 8 and d[:8].isdigit():
date_part = f"{d[:4]}-{d[4:6]}-{d[6:8]}"
else:
return ""
t = (time_raw or "").strip().replace(":", "")
if len(t) >= 6 and t[:6].isdigit():
return f"{date_part} {t[0:2]}:{t[2:4]}:{t[4:6]}"
if len(t) >= 4 and t.isdigit():
return f"{date_part} {t[0:2]}:{t[2:4]}"
return date_part
def _parse_ctp_open_date(raw: str) -> str:
return CtpBridge._parse_ctp_open_datetime(raw, "")
def _install_position_margin_hook(self) -> None:
"""拦截 CTP 持仓回报,缓存柜台 UseMargin。"""
if self._margin_hooked or not self._engine:
return
try:
gw = self._engine.get_gateway(GATEWAY_NAME)
td = getattr(gw, "td_api", None)
if not td or not hasattr(td, "onRspQryInvestorPosition"):
return
bridge = self
original = td.onRspQryInvestorPosition
def _wrapped(data, error, reqid, last):
try:
if data and isinstance(data, dict):
sym = (data.get("InstrumentID") or "").strip()
pos_dir = str(data.get("PosiDirection") or "")
if pos_dir == "2":
d = "long"
elif pos_dir == "3":
d = "short"
else:
d = "long" if "LONG" in pos_dir.upper() else "short"
margin = float(
data.get("UseMargin") or data.get("ExchangeMargin") or 0
)
if sym and margin > 0:
k = bridge._position_margin_key(sym, d)
bridge._position_margins[k] = (
bridge._position_margins.get(k, 0.0) + margin
)
open_date = bridge._parse_ctp_open_datetime(
str(data.get("OpenDate") or data.get("open_date") or ""),
str(
data.get("OpenTime") or data.get("open_time")
or data.get("TradeTime") or ""
),
)
if sym and open_date:
k = bridge._position_margin_key(sym, d)
prev = bridge._position_open_times.get(k, "")
if not prev or open_date < prev:
bridge._position_open_times[k] = open_date
except Exception as exc:
logger.debug("margin hook row: %s", exc)
return original(data, error, reqid, last)
td.onRspQryInvestorPosition = _wrapped
self._margin_hooked = True
except Exception as exc:
logger.debug("install margin hook: %s", exc)
def _lookup_position_margin(self, sym: str, direction: str) -> float:
return float(self._position_margins.get(self._position_margin_key(sym, direction), 0) or 0)
def estimate_margin_one_lot(self, ths_code: str, price: float) -> Optional[float]:
"""用 CTP 合约信息估算 1 手保证金(需已连接并完成合约查询)。"""
if not self._engine or not price or price <= 0:
return None
try:
sym, ex_name = ths_to_vnpy_symbol(ths_code)
exchange = to_vnpy_exchange(ex_name)
vt_symbol = f"{sym}.{exchange.value}"
contract = self._engine.get_contract(vt_symbol)
if not contract:
return None
mult = float(getattr(contract, "size", 0) or 0)
long_r = float(getattr(contract, "long_margin_ratio", 0) or 0)
short_r = float(getattr(contract, "short_margin_ratio", 0) or 0)
ratio = max(long_r, short_r)
if mult <= 0 or ratio <= 0:
return None
return round(float(price) * mult * ratio, 2)
except Exception as exc:
logger.debug("estimate_margin_one_lot %s: %s", ths_code, exc)
return None
def _collect_positions(self) -> list[dict[str, Any]]:
if not self._engine:
return []
out: list[dict[str, Any]] = []
for pos in self._engine.get_all_positions():
vol = int(getattr(pos, "volume", 0) or 0)
if vol <= 0:
continue
d = "long" if _is_long_direction(getattr(pos, "direction", None)) else "short"
sym = getattr(pos, "symbol", "") or ""
exchange = getattr(pos, "exchange", None)
ex_name = str(exchange.value if hasattr(exchange, "value") else exchange or "")
margin = self._lookup_position_margin(sym, d)
open_time = self._lookup_position_open_time(sym, d) or None
out.append({
"symbol": sym,
"exchange": ex_name,
"direction": d,
"lots": vol,
"avg_price": float(getattr(pos, "price", 0) or 0),
"pnl": float(getattr(pos, "pnl", 0) or 0),
"frozen": int(getattr(pos, "frozen", 0) or 0),
"margin": round(margin, 2) if margin > 0 else None,
"open_time": open_time,
})
return out
def refresh_positions(self) -> None:
"""向柜台查询持仓(内存为空时补拉)。"""
if not self._engine:
return
now = time.time()
if now - self._last_position_query_ts < 1.0:
return
self._last_position_query_ts = now
try:
self._install_position_margin_hook()
gw = self._engine.get_gateway(GATEWAY_NAME)
td = getattr(gw, "td_api", None)
if td and hasattr(td, "query_position"):
self._position_margins.clear()
self._position_open_times.clear()
td.query_position()
time.sleep(0.4)
except Exception as exc:
logger.debug("refresh_positions: %s", exc)
def list_positions(self, *, refresh_if_empty: bool = True, refresh_margin: bool = False) -> list[dict[str, Any]]:
if self._engine and self._connected_mode and refresh_margin:
self.refresh_positions()
out = self._collect_positions()
if not out and refresh_if_empty:
self.refresh_positions()
out = self._collect_positions()
return out
@staticmethod
def _parse_trade_offset(offset_obj: Any) -> str:
s = str(offset_obj or "").upper()
if "OPEN" in s:
return "open"
return "close"
@staticmethod
def _parse_trade_direction(direction_obj: Any) -> str:
return "long" if _is_long_direction(direction_obj) else "short"
@staticmethod
def _position_direction_from_trade(trade_direction: str, offset: str) -> str:
td = (trade_direction or "long").strip().lower()
if (offset or "open").strip().lower() == "open":
return td
return "short" if td == "long" else "long"
def _format_trade_datetime(self, dt_obj: Any, date_raw: str = "", time_raw: str = "") -> str:
if dt_obj is not None:
try:
if hasattr(dt_obj, "strftime"):
return dt_obj.strftime("%Y-%m-%d %H:%M:%S")
text = str(dt_obj).strip()
if text:
return text[:19].replace("T", " ")
except Exception:
pass
parsed = self._parse_ctp_open_datetime(date_raw, time_raw)
return parsed or ""
def _trade_row_from_vnpy(self, trade: Any) -> Optional[dict[str, Any]]:
try:
sym = (getattr(trade, "symbol", "") or "").strip()
vol = int(getattr(trade, "volume", 0) or 0)
if not sym or vol <= 0:
return None
direction = self._parse_trade_direction(getattr(trade, "direction", None))
offset = self._parse_trade_offset(getattr(trade, "offset", None))
exchange = getattr(trade, "exchange", None)
ex_name = str(exchange.value if hasattr(exchange, "value") else exchange or "")
dt = self._format_trade_datetime(getattr(trade, "datetime", None))
trade_id = str(getattr(trade, "tradeid", "") or getattr(trade, "vt_tradeid", "") or "")
order_id = str(getattr(trade, "orderid", "") or getattr(trade, "vt_orderid", "") or "")
if not trade_id:
trade_id = f"{order_id}:{sym}:{offset}:{direction}:{vol}:{getattr(trade, 'price', 0)}:{dt}"
return {
"trade_id": trade_id,
"order_id": order_id,
"symbol": sym,
"exchange": ex_name,
"direction": direction,
"offset": offset,
"position_direction": self._position_direction_from_trade(direction, offset),
"lots": vol,
"price": float(getattr(trade, "price", 0) or 0),
"datetime": dt,
}
except Exception as exc:
logger.debug("trade_row_from_vnpy: %s", exc)
return None
def _trade_row_from_ctp_dict(self, data: dict) -> Optional[dict[str, Any]]:
try:
sym = (data.get("InstrumentID") or data.get("instrument_id") or "").strip()
vol = int(float(data.get("Volume") or data.get("volume") or 0))
if not sym or vol <= 0:
return None
dir_raw = str(data.get("Direction") or data.get("direction") or "")
direction = "long" if dir_raw in ("0", "2") or "LONG" in dir_raw.upper() or dir_raw == "" else "short"
off_raw = str(data.get("OffsetFlag") or data.get("offset") or "")
if off_raw in ("0",) or "OPEN" in off_raw.upper():
offset = "open"
else:
offset = "close"
price = float(data.get("Price") or data.get("price") or 0)
trade_id = str(data.get("TradeID") or data.get("tradeid") or "").strip()
order_sys = str(data.get("OrderSysID") or data.get("orderid") or "").strip()
dt = self._format_trade_datetime(
None,
str(data.get("TradeDate") or data.get("trade_date") or ""),
str(data.get("TradeTime") or data.get("trade_time") or ""),
)
if not trade_id:
trade_id = f"{order_sys}:{sym}:{offset}:{direction}:{vol}:{price}:{dt}"
return {
"trade_id": trade_id,
"order_id": order_sys,
"symbol": sym,
"exchange": str(data.get("ExchangeID") or data.get("exchange") or ""),
"direction": direction,
"offset": offset,
"position_direction": self._position_direction_from_trade(direction, offset),
"lots": vol,
"price": price,
"datetime": dt,
}
except Exception as exc:
logger.debug("trade_row_from_ctp_dict: %s", exc)
return None
def _install_trade_query_hook(self) -> None:
if self._trade_hooked or not self._engine:
return
try:
gw = self._engine.get_gateway(GATEWAY_NAME)
td = getattr(gw, "td_api", None)
if not td or not hasattr(td, "onRspQryTrade"):
return
bridge = self
original = td.onRspQryTrade
def _wrapped(data, error, reqid, last):
try:
if data and isinstance(data, dict):
row = bridge._trade_row_from_ctp_dict(data)
if row:
bridge._trade_query_results.append(row)
except Exception as exc:
logger.debug("trade hook row: %s", exc)
result = original(data, error, reqid, last)
if last:
bridge._trade_query_event.set()
return result
td.onRspQryTrade = _wrapped
self._trade_hooked = True
except Exception as exc:
logger.debug("install trade hook: %s", exc)
def _collect_engine_trades(self) -> list[dict[str, Any]]:
if not self._engine:
return []
out: list[dict[str, Any]] = []
seen: set[str] = set()
try:
trades = self._engine.get_all_trades()
except Exception:
trades = {}
for trade in (trades or {}).values():
row = self._trade_row_from_vnpy(trade)
if not row:
continue
key = row["trade_id"]
if key in seen:
continue
seen.add(key)
out.append(row)
return out
def refresh_trades(self) -> None:
"""向柜台查询当日成交(并合并内存成交回报)。"""
if not self._engine:
return
now = time.time()
if now - self._last_trade_query_ts < 1.0:
return
self._last_trade_query_ts = now
self._trade_query_results = []
self._trade_query_event.clear()
try:
self._install_trade_query_hook()
gw = self._engine.get_gateway(GATEWAY_NAME)
td = getattr(gw, "td_api", None)
if td and hasattr(td, "query_trade"):
td.query_trade()
self._trade_query_event.wait(timeout=2.0)
except Exception as exc:
logger.debug("refresh_trades: %s", exc)
def list_trades(self, *, refresh: bool = False) -> list[dict[str, Any]]:
if refresh:
self.refresh_trades()
merged: dict[str, dict[str, Any]] = {}
for row in self._collect_engine_trades():
merged[row["trade_id"]] = row
for row in self._trade_query_results:
merged[row["trade_id"]] = row
out = list(merged.values())
out.sort(key=lambda r: (r.get("datetime") or "", r.get("trade_id") or ""))
return out
def list_active_orders(self) -> list[dict[str, Any]]:
if not self._engine:
return []
out: list[dict[str, Any]] = []
try:
orders = self._engine.get_all_active_orders()
except Exception:
return []
for order in orders or []:
status = getattr(order, "status", None)
status_s = str(status)
if status_s and not any(x in status_s for x in ("NOTTRADED", "PARTTRADED", "SUBMITTING")):
continue
vol = int(getattr(order, "volume", 0) or 0)
traded = int(getattr(order, "traded", 0) or 0)
remain = max(0, vol - traded)
if remain <= 0:
continue
direction = getattr(order, "direction", None)
d = "long"
if direction is not None and str(direction).endswith("SHORT"):
d = "short"
offset = getattr(order, "offset", None)
offset_s = str(offset or "")
sym = getattr(order, "symbol", "") or ""
exchange = getattr(order, "exchange", None)
ex_name = str(exchange.value if hasattr(exchange, "value") else exchange or "")
out.append({
"symbol": sym,
"exchange": ex_name,
"direction": d,
"lots": remain,
"price": float(getattr(order, "price", 0) or 0),
"offset": offset_s,
"order_id": str(getattr(order, "orderid", "") or ""),
"status": status_s,
})
return out
def send_order(
self,
*,
ths_code: str,
offset: str,
direction: str,
lots: int,
price: float,
order_type: str = "limit",
) -> str:
from vnpy.trader.constant import Direction, Offset, OrderType
from vnpy.trader.object import OrderRequest
if not self._engine:
raise RuntimeError("CTP 未初始化")
if not self._td_logged_in():
raise RuntimeError("CTP 交易通道未登录,请重连后再下单")
sym, ex_name = ths_to_vnpy_symbol(ths_code)
exchange = to_vnpy_exchange(ex_name)
lots = max(1, int(lots))
tick = float(get_contract_spec(ths_code).get("tick_size") or 1.0)
offset = (offset or "open").lower()
direction = (direction or "long").lower()
if offset in ("open", "open_long", "open_short"):
d = Direction.LONG if direction == "long" or offset == "open_long" else Direction.SHORT
off = Offset.OPEN
elif offset in ("close", "close_long", "close_short"):
hold = "long" if direction == "long" or offset == "close_long" else "short"
if hold == "long":
d = Direction.SHORT
else:
d = Direction.LONG
off = self._resolve_close_offset(sym, ex_name, hold, lots)
else:
raise ValueError(f"未知开平: {offset}")
use_market = (order_type or "limit").lower() == "market"
if use_market:
ot = OrderType.FAK
price = self._aggressive_limit_price(ths_code, sym, ex_name, d, tick, price)
else:
ot = OrderType.LIMIT
price = round_to_tick(float(price), tick)
if price <= 0:
raise ValueError("委托价格无效,请检查行情或手动填写价格")
req = OrderRequest(
symbol=sym,
exchange=exchange,
direction=d,
type=ot,
volume=lots,
price=price,
offset=off,
)
logger.info(
"CTP 报单 %s %s %s %s手 @%s offset=%s type=%s",
sym, ex_name, d, lots, price, off, ot,
)
vt_orderid = self._engine.send_order(req, GATEWAY_NAME)
if not vt_orderid:
raise RuntimeError("CTP 拒单或未返回委托号(请检查合约代码、价格是否为最小变动价位整数倍)")
return str(vt_orderid)
def cancel_order(self, vt_orderid: str) -> bool:
if not self._engine or not vt_orderid:
return False
try:
order = self._engine.get_order(vt_orderid)
if order is None:
return False
req = order.create_cancel_request()
self._engine.cancel_order(req, GATEWAY_NAME)
logger.info("CTP 撤单 %s", vt_orderid)
return True
except Exception as exc:
logger.warning("CTP 撤单失败 %s: %s", vt_orderid, exc)
return False
def get_bridge() -> CtpBridge:
global _bridge
with _bridge_lock:
if _bridge is None:
_bridge = CtpBridge()
return _bridge
def try_init_vnpy(_settings: dict | None = None) -> bool:
return get_bridge().available()
def vnpy_available() -> bool:
return get_bridge().available()
def ctp_connect(mode: str, *, force: bool = False) -> dict[str, Any]:
b = get_bridge()
b.connect(mode, force=force)
return b.status(mode)
def ctp_start_connect(mode: str, *, force: bool = False) -> dict[str, Any]:
"""非阻塞发起连接,供 Web API 使用。"""
b = get_bridge()
info = b.start_connect_async(mode, force=force)
st = b.status(mode)
return {**info, "status": st}
def ctp_try_auto_reconnect(mode: str) -> bool:
"""断线时静默异步重连;已连接且 ping 正常则直接返回 True。"""
b = get_bridge()
if not b.available():
return False
if b.connect_in_progress():
return False
if b.login_cooldown_remaining() > 0:
return False
st = _setting_for_mode(mode)
if not st.get("用户名") or not st.get("密码") or not st.get("交易服务器"):
return False
if b.connected_mode == mode and b.ping():
return True
td = st.get("交易服务器", "")
ok, err = probe_tcp_address(td, timeout=4.0)
if not ok:
b._last_error = (
f"SimNow 交易前置不可达:{td}{err})。"
"请更新 SIMNOW_TD_ADDRESS 并确认服务器出网。"
)
return False
info = b.start_connect_async(mode, force=False)
return bool(
info.get("connected")
or info.get("connecting")
or info.get("started")
)
def ctp_status(mode: str) -> dict[str, Any]:
st = get_bridge().status(mode)
if not st.get("connected") and not st.get("connecting"):
setting = _setting_for_mode(mode)
td = setting.get("交易服务器", "")
if td:
ok, err = probe_tcp_address(td, timeout=3.0)
st["td_reachable"] = ok
if not ok and not st.get("last_error"):
st["last_error"] = (
f"SimNow 交易前置不可达:{td}{err}"
)
return st
def ctp_get_account(mode: str) -> dict[str, Any]:
b = get_bridge()
b.ensure_connected(mode)
return b.get_account()
def ctp_list_positions(
mode: str,
*,
refresh_if_empty: bool = True,
refresh_margin: bool = False,
) -> list[dict[str, Any]]:
b = get_bridge()
if b.connected_mode != mode or not b.ping():
return []
return b.list_positions(refresh_if_empty=refresh_if_empty, refresh_margin=refresh_margin)
def ctp_list_active_orders(mode: str) -> list[dict[str, Any]]:
b = get_bridge()
b.ensure_connected(mode)
return b.list_active_orders()
def ctp_cancel_order(mode: str, vt_orderid: str) -> bool:
b = get_bridge()
b.ensure_connected(mode)
return b.cancel_order(vt_orderid)
def ctp_list_trades(mode: str, *, refresh: bool = False) -> list[dict[str, Any]]:
b = get_bridge()
if b.connected_mode != mode or not b.ping():
return []
return b.list_trades(refresh=refresh)
def ctp_get_tick_price(mode: str, ths_code: str) -> Optional[float]:
"""CTP 柜台最新价(需已连接并订阅)。"""
b = get_bridge()
if b.connected_mode != mode:
return None
try:
return b.get_tick_price(ths_code, mode=mode)
except Exception as exc:
logger.debug("ctp_get_tick_price: %s", exc)
return None
def ctp_get_tick_detail(mode: str, ths_code: str) -> dict[str, Any]:
b = get_bridge()
if b.connected_mode != mode:
return {}
try:
return b.get_tick_detail(ths_code, mode=mode)
except Exception as exc:
logger.debug("ctp_get_tick_detail: %s", exc)
return {}
def ctp_estimate_margin_one_lot(mode: str, ths_code: str, price: float) -> Optional[float]:
b = get_bridge()
if b.connected_mode != mode or not b.ping():
return None
try:
return b.estimate_margin_one_lot(ths_code, price)
except Exception as exc:
logger.debug("ctp_estimate_margin_one_lot: %s", exc)
return None
def get_ctp_balance(mode: str) -> Optional[float]:
try:
acc = ctp_get_account(mode)
bal = acc.get("balance")
return float(bal) if bal else None
except Exception as exc:
logger.debug("get_ctp_balance: %s", exc)
return None
def execute_order(
conn,
*,
mode: str,
offset: str,
symbol: str,
direction: str,
lots: int,
price: float,
settings: dict | None = None,
order_type: str = "limit",
) -> dict[str, Any]:
"""统一下单:simulation=SimNowlive=期货公司 CTP。"""
del conn, settings
if mode not in ("simulation", "live"):
raise ValueError("未知交易模式")
if not vnpy_available():
raise ValueError(
"请先安装 vnpy 与 vnpy_ctppip install vnpy vnpy_ctp\n"
f"模拟盘需配置 .env 中 SIMNOW_USER / SIMNOW_PASSWORD 等"
)
b = get_bridge()
b.require_connected(mode)
order_id = b.send_order(
ths_code=symbol,
offset=offset,
direction=direction,
lots=lots,
price=price,
order_type=order_type,
)
return {
"order_id": order_id,
"mode": mode,
"mode_label": _mode_label(mode),
"symbol": symbol,
"lots": lots,
"price": price,
}