Files
qihuo/vnpy_bridge.py
T
dekun 6e423eebfb 接入 SimNow 模拟盘与期货下单、策略及品种推荐功能。
新增 vnpy CTP 桥接、以损定仓/固定张数、趋势回调与滚仓策略、按资金推荐品种及交易风控;模拟盘走 SimNow,实盘预留期货公司配置。

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-24 10:04:37 +08:00

320 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""CTP 执行层:模拟盘 → SimNow;实盘 → 期货公司(vnpy_ctp)。"""
from __future__ import annotations
import logging
import os
import threading
import time
from typing import Any, Optional
from ctp_symbol import ths_to_vnpy_symbol, to_vnpy_exchange
logger = logging.getLogger(__name__)
GATEWAY_NAME = "CTP"
_bridge: Optional["CtpBridge"] = None
_bridge_lock = threading.Lock()
def _env(key: str, default: str = "") -> str:
return (os.getenv(key) or default).strip()
def _simnow_setting() -> dict[str, str]:
"""SimNow 7×24 仿真默认前置(可在 .env 覆盖)。"""
return {
"用户名": _env("SIMNOW_USER"),
"密码": _env("SIMNOW_PASSWORD"),
"经纪商代码": _env("SIMNOW_BROKER_ID", "9999"),
"交易服务器": _env("SIMNOW_TD_ADDRESS", "tcp://180.168.146.187:10201"),
"行情服务器": _env("SIMNOW_MD_ADDRESS", "tcp://180.168.146.187:10211"),
"产品名称": _env("SIMNOW_APP_ID", "simnow_client_test"),
"授权编码": _env("SIMNOW_AUTH_CODE", "0000000000000000"),
"产品信息": _env("SIMNOW_PRODUCT_INFO", "simnow_client_test"),
}
def _live_setting() -> dict[str, str]:
return {
"用户名": _env("CTP_LIVE_USER"),
"密码": _env("CTP_LIVE_PASSWORD"),
"经纪商代码": _env("CTP_LIVE_BROKER_ID"),
"交易服务器": _env("CTP_LIVE_TD_ADDRESS"),
"行情服务器": _env("CTP_LIVE_MD_ADDRESS"),
"产品名称": _env("CTP_LIVE_APP_ID"),
"授权编码": _env("CTP_LIVE_AUTH_CODE"),
"产品信息": _env("CTP_LIVE_PRODUCT_INFO"),
}
def _setting_for_mode(mode: str) -> dict[str, str]:
return _simnow_setting() if mode == "simulation" else _live_setting()
def _mode_label(mode: str) -> str:
return "SimNow 模拟" if mode == "simulation" else "期货公司实盘"
class CtpBridge:
def __init__(self) -> None:
self._engine = None
self._ee = None
self._connected_mode: Optional[str] = None
self._last_error: str = ""
self._connect_lock = threading.Lock()
self._init_engine()
def _init_engine(self) -> None:
try:
from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy_ctp import CtpGateway
self._ee = EventEngine()
self._engine = MainEngine(self._ee)
self._engine.add_gateway(CtpGateway)
except ImportError:
self._last_error = "未安装 vnpy / vnpy_ctp,请 pip install vnpy vnpy_ctp"
except Exception as exc:
self._last_error = str(exc)
def available(self) -> bool:
return self._engine is not None
@property
def last_error(self) -> str:
return self._last_error
@property
def connected_mode(self) -> Optional[str]:
return self._connected_mode
def status(self, mode: str) -> dict[str, Any]:
st = _setting_for_mode(mode)
missing = [k for k in ("用户名", "密码", "交易服务器") if not st.get(k)]
return {
"vnpy_installed": self.available(),
"connected": self._connected_mode == mode,
"connected_mode": self._connected_mode,
"mode_label": _mode_label(mode),
"missing_config": missing,
"last_error": self._last_error,
"broker_id": st.get("经纪商代码", ""),
"td_address": st.get("交易服务器", ""),
}
def connect(self, mode: str, *, force: bool = False) -> None:
if not self._engine:
raise RuntimeError(self._last_error or "vnpy 引擎未初始化")
if self._connected_mode == mode and not force:
return
setting = _setting_for_mode(mode)
if not setting.get("用户名") or not setting.get("密码"):
raise ValueError(
f"{_mode_label(mode)}:请在 .env 配置 "
f"{'SIMNOW_USER / SIMNOW_PASSWORD' if mode == 'simulation' else 'CTP_LIVE_USER / CTP_LIVE_PASSWORD'}"
)
if not setting.get("交易服务器"):
raise ValueError(f"{_mode_label(mode)}:未配置交易服务器地址")
with self._connect_lock:
if self._connected_mode and self._connected_mode != mode:
try:
self._engine.close()
except Exception:
pass
self._connected_mode = None
time.sleep(1)
self._engine.connect(setting, GATEWAY_NAME)
# 等待登录与结算信息
for _ in range(30):
accounts = self._engine.get_all_accounts()
if accounts:
self._connected_mode = mode
self._last_error = ""
logger.info("CTP 已连接 [%s] account=%s", mode, len(accounts))
return
time.sleep(0.5)
self._last_error = "CTP 连接超时,请检查 SimNow 账号、前置地址与交易时段"
raise RuntimeError(self._last_error)
def ensure_connected(self, mode: str) -> None:
if self._connected_mode != mode:
self.connect(mode)
def get_account(self) -> dict[str, Any]:
if not self._engine:
return {}
accounts = self._engine.get_all_accounts()
if not accounts:
return {}
acc = accounts[0]
return {
"balance": float(getattr(acc, "balance", 0) or 0),
"available": float(getattr(acc, "available", 0) or 0),
"frozen": float(getattr(acc, "frozen", 0) or 0),
"accountid": getattr(acc, "accountid", ""),
}
def list_positions(self) -> list[dict[str, Any]]:
if not self._engine:
return []
out: list[dict[str, Any]] = []
for pos in self._engine.get_all_positions():
vol = int(getattr(pos, "volume", 0) or 0)
if vol <= 0:
continue
direction = getattr(pos, "direction", None)
d = "long"
if direction is not None and str(direction).endswith("SHORT"):
d = "short"
elif direction is not None and "" in str(direction):
d = "short"
sym = getattr(pos, "symbol", "") or ""
out.append({
"symbol": sym,
"direction": d,
"lots": vol,
"avg_price": float(getattr(pos, "price", 0) or 0),
"pnl": float(getattr(pos, "pnl", 0) or 0),
})
return out
def send_order(
self,
*,
ths_code: str,
offset: str,
direction: str,
lots: int,
price: float,
) -> str:
from vnpy.trader.constant import Direction, Offset, OrderType
from vnpy.trader.object import OrderRequest
if not self._engine:
raise RuntimeError("CTP 未初始化")
sym, ex_name = ths_to_vnpy_symbol(ths_code)
exchange = to_vnpy_exchange(ex_name)
lots = max(1, int(lots))
price = float(price)
offset = (offset or "open").lower()
direction = (direction or "long").lower()
if offset in ("open", "open_long", "open_short"):
d = Direction.LONG if direction == "long" or offset == "open_long" else Direction.SHORT
off = Offset.OPEN
elif offset in ("close", "close_long", "close_short"):
# 平多 = 卖;平空 = 买
if direction == "long" or offset == "close_long":
d = Direction.SHORT
else:
d = Direction.LONG
off = Offset.CLOSE
else:
raise ValueError(f"未知开平: {offset}")
req = OrderRequest(
symbol=sym,
exchange=exchange,
direction=d,
type=OrderType.LIMIT,
volume=lots,
price=price,
offset=off,
)
vt_orderid = self._engine.send_order(req, GATEWAY_NAME)
if not vt_orderid:
raise RuntimeError("CTP 拒单或未返回委托号")
return str(vt_orderid)
def get_bridge() -> CtpBridge:
global _bridge
with _bridge_lock:
if _bridge is None:
_bridge = CtpBridge()
return _bridge
def try_init_vnpy(_settings: dict | None = None) -> bool:
return get_bridge().available()
def vnpy_available() -> bool:
return get_bridge().available()
def ctp_connect(mode: str, *, force: bool = False) -> dict[str, Any]:
b = get_bridge()
b.connect(mode, force=force)
return b.status(mode)
def ctp_status(mode: str) -> dict[str, Any]:
return get_bridge().status(mode)
def ctp_get_account(mode: str) -> dict[str, Any]:
b = get_bridge()
b.ensure_connected(mode)
return b.get_account()
def ctp_list_positions(mode: str) -> list[dict[str, Any]]:
b = get_bridge()
b.ensure_connected(mode)
return b.list_positions()
def get_ctp_balance(mode: str) -> Optional[float]:
try:
acc = ctp_get_account(mode)
bal = acc.get("balance")
return float(bal) if bal else None
except Exception as exc:
logger.debug("get_ctp_balance: %s", exc)
return None
def execute_order(
conn,
*,
mode: str,
offset: str,
symbol: str,
direction: str,
lots: int,
price: float,
settings: dict | None = None,
) -> dict[str, Any]:
"""统一下单:simulation=SimNowlive=期货公司 CTP。"""
del conn, settings
if mode not in ("simulation", "live"):
raise ValueError("未知交易模式")
if not vnpy_available():
raise ValueError(
"请先安装 vnpy 与 vnpy_ctppip install vnpy vnpy_ctp\n"
f"模拟盘需配置 .env 中 SIMNOW_USER / SIMNOW_PASSWORD 等"
)
b = get_bridge()
b.ensure_connected(mode)
order_id = b.send_order(
ths_code=symbol,
offset=offset,
direction=direction,
lots=lots,
price=price,
)
return {
"order_id": order_id,
"mode": mode,
"mode_label": _mode_label(mode),
"symbol": symbol,
"lots": lots,
"price": price,
}