Files
qihuo/vnpy_bridge.py
T
dekun f73d436077 fix: CTP 连接后 locale 崩溃,PM2 设置 LANG=C.UTF-8
vnpy_ctp C++ 扩展在缺 locale 时会 terminate;补充 SimNow 备用前置说明。

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-24 11:32:20 +08:00

376 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""CTP 执行层:模拟盘 → SimNow;实盘 → 期货公司(vnpy_ctp)。"""
from __future__ import annotations
import logging
import os
import threading
import time
from typing import Any, Optional
# vnpy_ctp C++ 在部分 Linux 上缺 locale 会抛 std::runtime_error
os.environ.setdefault("LANG", "C.UTF-8")
os.environ.setdefault("LC_ALL", "C.UTF-8")
from ctp_symbol import ths_to_vnpy_symbol, to_vnpy_exchange
logger = logging.getLogger(__name__)
GATEWAY_NAME = "CTP"
_bridge: Optional["CtpBridge"] = None
_bridge_lock = threading.Lock()
def _env(key: str, default: str = "") -> str:
return (os.getenv(key) or default).strip()
def _simnow_setting() -> dict[str, str]:
"""SimNow 仿真前置(可在 .env 覆盖)。看穿式前置需「柜台环境=实盘」。"""
return {
"用户名": _env("SIMNOW_USER"),
"密码": _env("SIMNOW_PASSWORD"),
"经纪商代码": _env("SIMNOW_BROKER_ID", "9999"),
"交易服务器": _env("SIMNOW_TD_ADDRESS", "tcp://180.168.146.187:10201"),
"行情服务器": _env("SIMNOW_MD_ADDRESS", "tcp://180.168.146.187:10211"),
"产品名称": _env("SIMNOW_APP_ID", "simnow_client_test"),
"授权编码": _env("SIMNOW_AUTH_CODE", "0000000000000000"),
"柜台环境": _env("SIMNOW_ENV", "实盘"),
}
def _live_setting() -> dict[str, str]:
return {
"用户名": _env("CTP_LIVE_USER"),
"密码": _env("CTP_LIVE_PASSWORD"),
"经纪商代码": _env("CTP_LIVE_BROKER_ID"),
"交易服务器": _env("CTP_LIVE_TD_ADDRESS"),
"行情服务器": _env("CTP_LIVE_MD_ADDRESS"),
"产品名称": _env("CTP_LIVE_APP_ID"),
"授权编码": _env("CTP_LIVE_AUTH_CODE"),
"柜台环境": _env("CTP_LIVE_ENV", "实盘"),
}
def _setting_for_mode(mode: str) -> dict[str, str]:
return _simnow_setting() if mode == "simulation" else _live_setting()
def _mode_label(mode: str) -> str:
return "SimNow 模拟" if mode == "simulation" else "期货公司实盘"
def _format_ctp_failure(ctp_logs: list[str]) -> str:
"""根据 CTP 网关日志拼出可读错误。"""
text = "\n".join(ctp_logs)
if "4097" in text or "Decrypt handshake" in text or "shake hand" in text.lower():
return (
"CTP 握手失败(4097)vnpy_ctp 与 SimNow 前置加密不匹配。"
"请执行 pip install -U vnpy vnpy_ctp 后重启,并确认 .env 中 SIMNOW_ENV=实盘"
)
if "不合法的登录" in text or "密码" in text or "账号" in text:
tail = ctp_logs[-1] if ctp_logs else ""
return f"CTP 登录被拒:{tail or '请检查投资者代码与密码(快期能否登录)'}"
if "连接断开" in text or "disconnect" in text.lower():
tail = ctp_logs[-1] if ctp_logs else ""
return f"CTP 连接断开:{tail or '请检查前置地址与网络'}"
if ctp_logs:
return f"CTP 连接失败:{ctp_logs[-1]}"
return "CTP 连接超时:未收到柜台回报。请检查 SimNow 账号、前置地址、网络(nc 测端口),并用快期验证账号"
class CtpBridge:
def __init__(self) -> None:
self._engine = None
self._ee = None
self._connected_mode: Optional[str] = None
self._last_error: str = ""
self._connect_lock = threading.Lock()
self._init_engine()
def _init_engine(self) -> None:
try:
from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy_ctp import CtpGateway
self._ee = EventEngine()
self._engine = MainEngine(self._ee)
self._engine.add_gateway(CtpGateway)
except ImportError:
self._last_error = "未安装 vnpy / vnpy_ctp,请 pip install vnpy vnpy_ctp"
except Exception as exc:
self._last_error = str(exc)
def available(self) -> bool:
return self._engine is not None
@property
def last_error(self) -> str:
return self._last_error
@property
def connected_mode(self) -> Optional[str]:
return self._connected_mode
def status(self, mode: str) -> dict[str, Any]:
st = _setting_for_mode(mode)
missing = [k for k in ("用户名", "密码", "交易服务器") if not st.get(k)]
return {
"vnpy_installed": self.available(),
"connected": self._connected_mode == mode,
"connected_mode": self._connected_mode,
"mode_label": _mode_label(mode),
"missing_config": missing,
"last_error": self._last_error,
"broker_id": st.get("经纪商代码", ""),
"td_address": st.get("交易服务器", ""),
}
def connect(self, mode: str, *, force: bool = False) -> None:
if not self._engine:
raise RuntimeError(self._last_error or "vnpy 引擎未初始化")
if self._connected_mode == mode and not force:
return
setting = _setting_for_mode(mode)
if not setting.get("用户名") or not setting.get("密码"):
raise ValueError(
f"{_mode_label(mode)}:请在 .env 配置 "
f"{'SIMNOW_USER / SIMNOW_PASSWORD' if mode == 'simulation' else 'CTP_LIVE_USER / CTP_LIVE_PASSWORD'}"
)
if not setting.get("交易服务器"):
raise ValueError(f"{_mode_label(mode)}:未配置交易服务器地址")
with self._connect_lock:
if self._connected_mode and self._connected_mode != mode:
try:
self._engine.close()
except Exception:
pass
self._connected_mode = None
time.sleep(1)
ctp_logs: list[str] = []
from vnpy.trader.event import EVENT_LOG
def _on_log(event) -> None:
msg = getattr(event.data, "msg", "") or str(event.data)
if msg:
ctp_logs.append(str(msg))
if len(ctp_logs) > 20:
ctp_logs.pop(0)
logger.info("CTP | %s", msg)
self._ee.register(EVENT_LOG, _on_log)
try:
logger.info(
"CTP 连接 [%s] user=%s td=%s env=%s",
mode,
setting.get("用户名"),
setting.get("交易服务器"),
setting.get("柜台环境", "实盘"),
)
self._engine.connect(setting, GATEWAY_NAME)
# 等待登录与结算信息(最多约 30 秒)
for _ in range(60):
accounts = self._engine.get_all_accounts()
if accounts:
self._connected_mode = mode
self._last_error = ""
logger.info("CTP 已连接 [%s] account=%s", mode, len(accounts))
return
time.sleep(0.5)
finally:
self._ee.unregister(EVENT_LOG, _on_log)
hint = _format_ctp_failure(ctp_logs)
self._last_error = hint
raise RuntimeError(hint)
def ensure_connected(self, mode: str) -> None:
if self._connected_mode != mode:
self.connect(mode)
def get_account(self) -> dict[str, Any]:
if not self._engine:
return {}
accounts = self._engine.get_all_accounts()
if not accounts:
return {}
acc = accounts[0]
return {
"balance": float(getattr(acc, "balance", 0) or 0),
"available": float(getattr(acc, "available", 0) or 0),
"frozen": float(getattr(acc, "frozen", 0) or 0),
"accountid": getattr(acc, "accountid", ""),
}
def list_positions(self) -> list[dict[str, Any]]:
if not self._engine:
return []
out: list[dict[str, Any]] = []
for pos in self._engine.get_all_positions():
vol = int(getattr(pos, "volume", 0) or 0)
if vol <= 0:
continue
direction = getattr(pos, "direction", None)
d = "long"
if direction is not None and str(direction).endswith("SHORT"):
d = "short"
elif direction is not None and "" in str(direction):
d = "short"
sym = getattr(pos, "symbol", "") or ""
exchange = getattr(pos, "exchange", None)
ex_name = str(exchange.value if hasattr(exchange, "value") else exchange or "")
out.append({
"symbol": sym,
"exchange": ex_name,
"direction": d,
"lots": vol,
"avg_price": float(getattr(pos, "price", 0) or 0),
"pnl": float(getattr(pos, "pnl", 0) or 0),
"frozen": int(getattr(pos, "frozen", 0) or 0),
})
return out
def send_order(
self,
*,
ths_code: str,
offset: str,
direction: str,
lots: int,
price: float,
order_type: str = "limit",
) -> str:
from vnpy.trader.constant import Direction, Offset, OrderType
from vnpy.trader.object import OrderRequest
if not self._engine:
raise RuntimeError("CTP 未初始化")
sym, ex_name = ths_to_vnpy_symbol(ths_code)
exchange = to_vnpy_exchange(ex_name)
lots = max(1, int(lots))
price = float(price)
offset = (offset or "open").lower()
direction = (direction or "long").lower()
if offset in ("open", "open_long", "open_short"):
d = Direction.LONG if direction == "long" or offset == "open_long" else Direction.SHORT
off = Offset.OPEN
elif offset in ("close", "close_long", "close_short"):
# 平多 = 卖;平空 = 买
if direction == "long" or offset == "close_long":
d = Direction.SHORT
else:
d = Direction.LONG
off = Offset.CLOSE
else:
raise ValueError(f"未知开平: {offset}")
ot = OrderType.MARKET if (order_type or "limit").lower() == "market" else OrderType.LIMIT
req = OrderRequest(
symbol=sym,
exchange=exchange,
direction=d,
type=ot,
volume=lots,
price=price,
offset=off,
)
vt_orderid = self._engine.send_order(req, GATEWAY_NAME)
if not vt_orderid:
raise RuntimeError("CTP 拒单或未返回委托号")
return str(vt_orderid)
def get_bridge() -> CtpBridge:
global _bridge
with _bridge_lock:
if _bridge is None:
_bridge = CtpBridge()
return _bridge
def try_init_vnpy(_settings: dict | None = None) -> bool:
return get_bridge().available()
def vnpy_available() -> bool:
return get_bridge().available()
def ctp_connect(mode: str, *, force: bool = False) -> dict[str, Any]:
b = get_bridge()
b.connect(mode, force=force)
return b.status(mode)
def ctp_status(mode: str) -> dict[str, Any]:
return get_bridge().status(mode)
def ctp_get_account(mode: str) -> dict[str, Any]:
b = get_bridge()
b.ensure_connected(mode)
return b.get_account()
def ctp_list_positions(mode: str) -> list[dict[str, Any]]:
b = get_bridge()
b.ensure_connected(mode)
return b.list_positions()
def get_ctp_balance(mode: str) -> Optional[float]:
try:
acc = ctp_get_account(mode)
bal = acc.get("balance")
return float(bal) if bal else None
except Exception as exc:
logger.debug("get_ctp_balance: %s", exc)
return None
def execute_order(
conn,
*,
mode: str,
offset: str,
symbol: str,
direction: str,
lots: int,
price: float,
settings: dict | None = None,
order_type: str = "limit",
) -> dict[str, Any]:
"""统一下单:simulation=SimNowlive=期货公司 CTP。"""
del conn, settings
if mode not in ("simulation", "live"):
raise ValueError("未知交易模式")
if not vnpy_available():
raise ValueError(
"请先安装 vnpy 与 vnpy_ctppip install vnpy vnpy_ctp\n"
f"模拟盘需配置 .env 中 SIMNOW_USER / SIMNOW_PASSWORD 等"
)
b = get_bridge()
b.ensure_connected(mode)
order_id = b.send_order(
ths_code=symbol,
offset=offset,
direction=direction,
lots=lots,
price=price,
order_type=order_type,
)
return {
"order_id": order_id,
"mode": mode,
"mode_label": _mode_label(mode),
"symbol": symbol,
"lots": lots,
"price": price,
}