接入 SimNow 模拟盘与期货下单、策略及品种推荐功能。

新增 vnpy CTP 桥接、以损定仓/固定张数、趋势回调与滚仓策略、按资金推荐品种及交易风控;模拟盘走 SimNow,实盘预留期货公司配置。

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-24 10:04:37 +08:00
parent 9c0e5d9c57
commit 6e423eebfb
30 changed files with 2789 additions and 60 deletions
+319
View File
@@ -0,0 +1,319 @@
"""CTP 执行层:模拟盘 → SimNow;实盘 → 期货公司(vnpy_ctp)。"""
from __future__ import annotations
import logging
import os
import threading
import time
from typing import Any, Optional
from ctp_symbol import ths_to_vnpy_symbol, to_vnpy_exchange
logger = logging.getLogger(__name__)
GATEWAY_NAME = "CTP"
_bridge: Optional["CtpBridge"] = None
_bridge_lock = threading.Lock()
def _env(key: str, default: str = "") -> str:
return (os.getenv(key) or default).strip()
def _simnow_setting() -> dict[str, str]:
"""SimNow 7×24 仿真默认前置(可在 .env 覆盖)。"""
return {
"用户名": _env("SIMNOW_USER"),
"密码": _env("SIMNOW_PASSWORD"),
"经纪商代码": _env("SIMNOW_BROKER_ID", "9999"),
"交易服务器": _env("SIMNOW_TD_ADDRESS", "tcp://180.168.146.187:10201"),
"行情服务器": _env("SIMNOW_MD_ADDRESS", "tcp://180.168.146.187:10211"),
"产品名称": _env("SIMNOW_APP_ID", "simnow_client_test"),
"授权编码": _env("SIMNOW_AUTH_CODE", "0000000000000000"),
"产品信息": _env("SIMNOW_PRODUCT_INFO", "simnow_client_test"),
}
def _live_setting() -> dict[str, str]:
return {
"用户名": _env("CTP_LIVE_USER"),
"密码": _env("CTP_LIVE_PASSWORD"),
"经纪商代码": _env("CTP_LIVE_BROKER_ID"),
"交易服务器": _env("CTP_LIVE_TD_ADDRESS"),
"行情服务器": _env("CTP_LIVE_MD_ADDRESS"),
"产品名称": _env("CTP_LIVE_APP_ID"),
"授权编码": _env("CTP_LIVE_AUTH_CODE"),
"产品信息": _env("CTP_LIVE_PRODUCT_INFO"),
}
def _setting_for_mode(mode: str) -> dict[str, str]:
return _simnow_setting() if mode == "simulation" else _live_setting()
def _mode_label(mode: str) -> str:
return "SimNow 模拟" if mode == "simulation" else "期货公司实盘"
class CtpBridge:
def __init__(self) -> None:
self._engine = None
self._ee = None
self._connected_mode: Optional[str] = None
self._last_error: str = ""
self._connect_lock = threading.Lock()
self._init_engine()
def _init_engine(self) -> None:
try:
from vnpy.event import EventEngine
from vnpy.trader.engine import MainEngine
from vnpy_ctp import CtpGateway
self._ee = EventEngine()
self._engine = MainEngine(self._ee)
self._engine.add_gateway(CtpGateway)
except ImportError:
self._last_error = "未安装 vnpy / vnpy_ctp,请 pip install vnpy vnpy_ctp"
except Exception as exc:
self._last_error = str(exc)
def available(self) -> bool:
return self._engine is not None
@property
def last_error(self) -> str:
return self._last_error
@property
def connected_mode(self) -> Optional[str]:
return self._connected_mode
def status(self, mode: str) -> dict[str, Any]:
st = _setting_for_mode(mode)
missing = [k for k in ("用户名", "密码", "交易服务器") if not st.get(k)]
return {
"vnpy_installed": self.available(),
"connected": self._connected_mode == mode,
"connected_mode": self._connected_mode,
"mode_label": _mode_label(mode),
"missing_config": missing,
"last_error": self._last_error,
"broker_id": st.get("经纪商代码", ""),
"td_address": st.get("交易服务器", ""),
}
def connect(self, mode: str, *, force: bool = False) -> None:
if not self._engine:
raise RuntimeError(self._last_error or "vnpy 引擎未初始化")
if self._connected_mode == mode and not force:
return
setting = _setting_for_mode(mode)
if not setting.get("用户名") or not setting.get("密码"):
raise ValueError(
f"{_mode_label(mode)}:请在 .env 配置 "
f"{'SIMNOW_USER / SIMNOW_PASSWORD' if mode == 'simulation' else 'CTP_LIVE_USER / CTP_LIVE_PASSWORD'}"
)
if not setting.get("交易服务器"):
raise ValueError(f"{_mode_label(mode)}:未配置交易服务器地址")
with self._connect_lock:
if self._connected_mode and self._connected_mode != mode:
try:
self._engine.close()
except Exception:
pass
self._connected_mode = None
time.sleep(1)
self._engine.connect(setting, GATEWAY_NAME)
# 等待登录与结算信息
for _ in range(30):
accounts = self._engine.get_all_accounts()
if accounts:
self._connected_mode = mode
self._last_error = ""
logger.info("CTP 已连接 [%s] account=%s", mode, len(accounts))
return
time.sleep(0.5)
self._last_error = "CTP 连接超时,请检查 SimNow 账号、前置地址与交易时段"
raise RuntimeError(self._last_error)
def ensure_connected(self, mode: str) -> None:
if self._connected_mode != mode:
self.connect(mode)
def get_account(self) -> dict[str, Any]:
if not self._engine:
return {}
accounts = self._engine.get_all_accounts()
if not accounts:
return {}
acc = accounts[0]
return {
"balance": float(getattr(acc, "balance", 0) or 0),
"available": float(getattr(acc, "available", 0) or 0),
"frozen": float(getattr(acc, "frozen", 0) or 0),
"accountid": getattr(acc, "accountid", ""),
}
def list_positions(self) -> list[dict[str, Any]]:
if not self._engine:
return []
out: list[dict[str, Any]] = []
for pos in self._engine.get_all_positions():
vol = int(getattr(pos, "volume", 0) or 0)
if vol <= 0:
continue
direction = getattr(pos, "direction", None)
d = "long"
if direction is not None and str(direction).endswith("SHORT"):
d = "short"
elif direction is not None and "" in str(direction):
d = "short"
sym = getattr(pos, "symbol", "") or ""
out.append({
"symbol": sym,
"direction": d,
"lots": vol,
"avg_price": float(getattr(pos, "price", 0) or 0),
"pnl": float(getattr(pos, "pnl", 0) or 0),
})
return out
def send_order(
self,
*,
ths_code: str,
offset: str,
direction: str,
lots: int,
price: float,
) -> str:
from vnpy.trader.constant import Direction, Offset, OrderType
from vnpy.trader.object import OrderRequest
if not self._engine:
raise RuntimeError("CTP 未初始化")
sym, ex_name = ths_to_vnpy_symbol(ths_code)
exchange = to_vnpy_exchange(ex_name)
lots = max(1, int(lots))
price = float(price)
offset = (offset or "open").lower()
direction = (direction or "long").lower()
if offset in ("open", "open_long", "open_short"):
d = Direction.LONG if direction == "long" or offset == "open_long" else Direction.SHORT
off = Offset.OPEN
elif offset in ("close", "close_long", "close_short"):
# 平多 = 卖;平空 = 买
if direction == "long" or offset == "close_long":
d = Direction.SHORT
else:
d = Direction.LONG
off = Offset.CLOSE
else:
raise ValueError(f"未知开平: {offset}")
req = OrderRequest(
symbol=sym,
exchange=exchange,
direction=d,
type=OrderType.LIMIT,
volume=lots,
price=price,
offset=off,
)
vt_orderid = self._engine.send_order(req, GATEWAY_NAME)
if not vt_orderid:
raise RuntimeError("CTP 拒单或未返回委托号")
return str(vt_orderid)
def get_bridge() -> CtpBridge:
global _bridge
with _bridge_lock:
if _bridge is None:
_bridge = CtpBridge()
return _bridge
def try_init_vnpy(_settings: dict | None = None) -> bool:
return get_bridge().available()
def vnpy_available() -> bool:
return get_bridge().available()
def ctp_connect(mode: str, *, force: bool = False) -> dict[str, Any]:
b = get_bridge()
b.connect(mode, force=force)
return b.status(mode)
def ctp_status(mode: str) -> dict[str, Any]:
return get_bridge().status(mode)
def ctp_get_account(mode: str) -> dict[str, Any]:
b = get_bridge()
b.ensure_connected(mode)
return b.get_account()
def ctp_list_positions(mode: str) -> list[dict[str, Any]]:
b = get_bridge()
b.ensure_connected(mode)
return b.list_positions()
def get_ctp_balance(mode: str) -> Optional[float]:
try:
acc = ctp_get_account(mode)
bal = acc.get("balance")
return float(bal) if bal else None
except Exception as exc:
logger.debug("get_ctp_balance: %s", exc)
return None
def execute_order(
conn,
*,
mode: str,
offset: str,
symbol: str,
direction: str,
lots: int,
price: float,
settings: dict | None = None,
) -> dict[str, Any]:
"""统一下单:simulation=SimNowlive=期货公司 CTP。"""
del conn, settings
if mode not in ("simulation", "live"):
raise ValueError("未知交易模式")
if not vnpy_available():
raise ValueError(
"请先安装 vnpy 与 vnpy_ctppip install vnpy vnpy_ctp\n"
f"模拟盘需配置 .env 中 SIMNOW_USER / SIMNOW_PASSWORD 等"
)
b = get_bridge()
b.ensure_connected(mode)
order_id = b.send_order(
ths_code=symbol,
offset=offset,
direction=direction,
lots=lots,
price=price,
)
return {
"order_id": order_id,
"mode": mode,
"mode_label": _mode_label(mode),
"symbol": symbol,
"lots": lots,
"price": price,
}