b4250171d5
后台每 30s 检测并重连;以损定仓填止损后自动算手数;开仓/平仓按钮并排对齐。 Co-authored-by: Cursor <cursoragent@cursor.com>
416 lines
14 KiB
Python
416 lines
14 KiB
Python
"""CTP 执行层:模拟盘 → SimNow;实盘 → 期货公司(vnpy_ctp)。"""
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import os
|
||
import threading
|
||
import time
|
||
from typing import Any, Optional
|
||
|
||
from locale_fix import ensure_process_locale
|
||
|
||
ensure_process_locale()
|
||
|
||
from ctp_symbol import ths_to_vnpy_symbol, to_vnpy_exchange
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
GATEWAY_NAME = "CTP"
|
||
|
||
_bridge: Optional["CtpBridge"] = None
|
||
_bridge_lock = threading.Lock()
|
||
|
||
|
||
def _env(key: str, default: str = "") -> str:
|
||
return (os.getenv(key) or default).strip()
|
||
|
||
|
||
def _simnow_setting() -> dict[str, str]:
|
||
"""SimNow 仿真前置(可在 .env 覆盖)。看穿式前置需「柜台环境=实盘」。"""
|
||
return {
|
||
"用户名": _env("SIMNOW_USER"),
|
||
"密码": _env("SIMNOW_PASSWORD"),
|
||
"经纪商代码": _env("SIMNOW_BROKER_ID", "9999"),
|
||
"交易服务器": _env("SIMNOW_TD_ADDRESS", "tcp://180.168.146.187:10201"),
|
||
"行情服务器": _env("SIMNOW_MD_ADDRESS", "tcp://180.168.146.187:10211"),
|
||
"产品名称": _env("SIMNOW_APP_ID", "simnow_client_test"),
|
||
"授权编码": _env("SIMNOW_AUTH_CODE", "0000000000000000"),
|
||
"柜台环境": _env("SIMNOW_ENV", "实盘"),
|
||
}
|
||
|
||
|
||
def _live_setting() -> dict[str, str]:
|
||
return {
|
||
"用户名": _env("CTP_LIVE_USER"),
|
||
"密码": _env("CTP_LIVE_PASSWORD"),
|
||
"经纪商代码": _env("CTP_LIVE_BROKER_ID"),
|
||
"交易服务器": _env("CTP_LIVE_TD_ADDRESS"),
|
||
"行情服务器": _env("CTP_LIVE_MD_ADDRESS"),
|
||
"产品名称": _env("CTP_LIVE_APP_ID"),
|
||
"授权编码": _env("CTP_LIVE_AUTH_CODE"),
|
||
"柜台环境": _env("CTP_LIVE_ENV", "实盘"),
|
||
}
|
||
|
||
|
||
def _setting_for_mode(mode: str) -> dict[str, str]:
|
||
return _simnow_setting() if mode == "simulation" else _live_setting()
|
||
|
||
|
||
def _mode_label(mode: str) -> str:
|
||
return "SimNow 模拟" if mode == "simulation" else "期货公司实盘"
|
||
|
||
|
||
def _format_ctp_failure(ctp_logs: list[str]) -> str:
|
||
"""根据 CTP 网关日志拼出可读错误。"""
|
||
text = "\n".join(ctp_logs)
|
||
if "4097" in text or "Decrypt handshake" in text or "shake hand" in text.lower():
|
||
return (
|
||
"CTP 握手失败(4097):vnpy_ctp 与 SimNow 前置加密不匹配。"
|
||
"请执行 pip install -U vnpy vnpy_ctp 后重启,并确认 .env 中 SIMNOW_ENV=实盘"
|
||
)
|
||
if "不合法的登录" in text or "密码" in text or "账号" in text:
|
||
tail = ctp_logs[-1] if ctp_logs else ""
|
||
return f"CTP 登录被拒:{tail or '请检查投资者代码与密码(快期能否登录)'}"
|
||
if "连接断开" in text or "disconnect" in text.lower():
|
||
tail = ctp_logs[-1] if ctp_logs else ""
|
||
return f"CTP 连接断开:{tail or '请检查前置地址与网络'}"
|
||
if ctp_logs:
|
||
return f"CTP 连接失败:{ctp_logs[-1]}"
|
||
return "CTP 连接超时:未收到柜台回报。请检查 SimNow 账号、前置地址、网络(nc 测端口),并用快期验证账号"
|
||
|
||
|
||
class CtpBridge:
|
||
def __init__(self) -> None:
|
||
self._engine = None
|
||
self._ee = None
|
||
self._connected_mode: Optional[str] = None
|
||
self._last_error: str = ""
|
||
self._connect_lock = threading.Lock()
|
||
self._init_engine()
|
||
|
||
def _init_engine(self) -> None:
|
||
ensure_process_locale()
|
||
try:
|
||
from vnpy.event import EventEngine
|
||
from vnpy.trader.engine import MainEngine
|
||
from vnpy_ctp import CtpGateway
|
||
|
||
self._ee = EventEngine()
|
||
self._engine = MainEngine(self._ee)
|
||
self._engine.add_gateway(CtpGateway)
|
||
except ImportError:
|
||
self._last_error = "未安装 vnpy / vnpy_ctp,请 pip install vnpy vnpy_ctp"
|
||
except Exception as exc:
|
||
self._last_error = str(exc)
|
||
|
||
def available(self) -> bool:
|
||
return self._engine is not None
|
||
|
||
@property
|
||
def last_error(self) -> str:
|
||
return self._last_error
|
||
|
||
@property
|
||
def connected_mode(self) -> Optional[str]:
|
||
return self._connected_mode
|
||
|
||
def status(self, mode: str) -> dict[str, Any]:
|
||
if self._connected_mode == mode:
|
||
self.ping()
|
||
st = _setting_for_mode(mode)
|
||
missing = [k for k in ("用户名", "密码", "交易服务器") if not st.get(k)]
|
||
return {
|
||
"vnpy_installed": self.available(),
|
||
"connected": self._connected_mode == mode,
|
||
"connected_mode": self._connected_mode,
|
||
"mode_label": _mode_label(mode),
|
||
"missing_config": missing,
|
||
"last_error": self._last_error,
|
||
"broker_id": st.get("经纪商代码", ""),
|
||
"td_address": st.get("交易服务器", ""),
|
||
}
|
||
|
||
def connect(self, mode: str, *, force: bool = False) -> None:
|
||
if not self._engine:
|
||
raise RuntimeError(self._last_error or "vnpy 引擎未初始化")
|
||
if self._connected_mode == mode and not force:
|
||
if self.ping():
|
||
return
|
||
self._connected_mode = None
|
||
setting = _setting_for_mode(mode)
|
||
if not setting.get("用户名") or not setting.get("密码"):
|
||
raise ValueError(
|
||
f"{_mode_label(mode)}:请在 .env 配置 "
|
||
f"{'SIMNOW_USER / SIMNOW_PASSWORD' if mode == 'simulation' else 'CTP_LIVE_USER / CTP_LIVE_PASSWORD'}"
|
||
)
|
||
if not setting.get("交易服务器"):
|
||
raise ValueError(f"{_mode_label(mode)}:未配置交易服务器地址")
|
||
|
||
with self._connect_lock:
|
||
if self._connected_mode and self._connected_mode != mode:
|
||
try:
|
||
self._engine.close()
|
||
except Exception:
|
||
pass
|
||
self._connected_mode = None
|
||
time.sleep(1)
|
||
|
||
ctp_logs: list[str] = []
|
||
from vnpy.trader.event import EVENT_LOG
|
||
|
||
def _on_log(event) -> None:
|
||
msg = getattr(event.data, "msg", "") or str(event.data)
|
||
if msg:
|
||
ctp_logs.append(str(msg))
|
||
if len(ctp_logs) > 20:
|
||
ctp_logs.pop(0)
|
||
logger.info("CTP | %s", msg)
|
||
|
||
self._ee.register(EVENT_LOG, _on_log)
|
||
try:
|
||
ensure_process_locale()
|
||
logger.info(
|
||
"CTP 连接 [%s] user=%s td=%s env=%s",
|
||
mode,
|
||
setting.get("用户名"),
|
||
setting.get("交易服务器"),
|
||
setting.get("柜台环境", "实盘"),
|
||
)
|
||
self._engine.connect(setting, GATEWAY_NAME)
|
||
# 等待登录与结算信息(最多约 30 秒)
|
||
for _ in range(60):
|
||
accounts = self._engine.get_all_accounts()
|
||
if accounts:
|
||
self._connected_mode = mode
|
||
self._last_error = ""
|
||
logger.info("CTP 已连接 [%s] account=%s", mode, len(accounts))
|
||
return
|
||
time.sleep(0.5)
|
||
finally:
|
||
self._ee.unregister(EVENT_LOG, _on_log)
|
||
|
||
hint = _format_ctp_failure(ctp_logs)
|
||
self._last_error = hint
|
||
raise RuntimeError(hint)
|
||
|
||
def ensure_connected(self, mode: str) -> None:
|
||
if self._connected_mode == mode and self.ping():
|
||
return
|
||
self.connect(mode)
|
||
|
||
def ping(self) -> bool:
|
||
"""检测连接是否仍有效;无效则清除 connected 状态。"""
|
||
if not self._engine or not self._connected_mode:
|
||
return False
|
||
try:
|
||
if self._engine.get_all_accounts():
|
||
return True
|
||
except Exception as exc:
|
||
logger.debug("CTP ping failed: %s", exc)
|
||
self._connected_mode = None
|
||
return False
|
||
|
||
def mark_disconnected(self) -> None:
|
||
self._connected_mode = None
|
||
|
||
def get_account(self) -> dict[str, Any]:
|
||
if not self._engine:
|
||
return {}
|
||
accounts = self._engine.get_all_accounts()
|
||
if not accounts:
|
||
return {}
|
||
acc = accounts[0]
|
||
return {
|
||
"balance": float(getattr(acc, "balance", 0) or 0),
|
||
"available": float(getattr(acc, "available", 0) or 0),
|
||
"frozen": float(getattr(acc, "frozen", 0) or 0),
|
||
"accountid": getattr(acc, "accountid", ""),
|
||
}
|
||
|
||
def list_positions(self) -> list[dict[str, Any]]:
|
||
if not self._engine:
|
||
return []
|
||
out: list[dict[str, Any]] = []
|
||
for pos in self._engine.get_all_positions():
|
||
vol = int(getattr(pos, "volume", 0) or 0)
|
||
if vol <= 0:
|
||
continue
|
||
direction = getattr(pos, "direction", None)
|
||
d = "long"
|
||
if direction is not None and str(direction).endswith("SHORT"):
|
||
d = "short"
|
||
elif direction is not None and "空" in str(direction):
|
||
d = "short"
|
||
sym = getattr(pos, "symbol", "") or ""
|
||
exchange = getattr(pos, "exchange", None)
|
||
ex_name = str(exchange.value if hasattr(exchange, "value") else exchange or "")
|
||
out.append({
|
||
"symbol": sym,
|
||
"exchange": ex_name,
|
||
"direction": d,
|
||
"lots": vol,
|
||
"avg_price": float(getattr(pos, "price", 0) or 0),
|
||
"pnl": float(getattr(pos, "pnl", 0) or 0),
|
||
"frozen": int(getattr(pos, "frozen", 0) or 0),
|
||
})
|
||
return out
|
||
|
||
def send_order(
|
||
self,
|
||
*,
|
||
ths_code: str,
|
||
offset: str,
|
||
direction: str,
|
||
lots: int,
|
||
price: float,
|
||
order_type: str = "limit",
|
||
) -> str:
|
||
from vnpy.trader.constant import Direction, Offset, OrderType
|
||
from vnpy.trader.object import OrderRequest
|
||
|
||
if not self._engine:
|
||
raise RuntimeError("CTP 未初始化")
|
||
sym, ex_name = ths_to_vnpy_symbol(ths_code)
|
||
exchange = to_vnpy_exchange(ex_name)
|
||
lots = max(1, int(lots))
|
||
price = float(price)
|
||
|
||
offset = (offset or "open").lower()
|
||
direction = (direction or "long").lower()
|
||
|
||
if offset in ("open", "open_long", "open_short"):
|
||
d = Direction.LONG if direction == "long" or offset == "open_long" else Direction.SHORT
|
||
off = Offset.OPEN
|
||
elif offset in ("close", "close_long", "close_short"):
|
||
# 平多 = 卖;平空 = 买
|
||
if direction == "long" or offset == "close_long":
|
||
d = Direction.SHORT
|
||
else:
|
||
d = Direction.LONG
|
||
off = Offset.CLOSE
|
||
else:
|
||
raise ValueError(f"未知开平: {offset}")
|
||
|
||
ot = OrderType.MARKET if (order_type or "limit").lower() == "market" else OrderType.LIMIT
|
||
|
||
req = OrderRequest(
|
||
symbol=sym,
|
||
exchange=exchange,
|
||
direction=d,
|
||
type=ot,
|
||
volume=lots,
|
||
price=price,
|
||
offset=off,
|
||
)
|
||
vt_orderid = self._engine.send_order(req, GATEWAY_NAME)
|
||
if not vt_orderid:
|
||
raise RuntimeError("CTP 拒单或未返回委托号")
|
||
return str(vt_orderid)
|
||
|
||
|
||
def get_bridge() -> CtpBridge:
|
||
global _bridge
|
||
with _bridge_lock:
|
||
if _bridge is None:
|
||
_bridge = CtpBridge()
|
||
return _bridge
|
||
|
||
|
||
def try_init_vnpy(_settings: dict | None = None) -> bool:
|
||
return get_bridge().available()
|
||
|
||
|
||
def vnpy_available() -> bool:
|
||
return get_bridge().available()
|
||
|
||
|
||
def ctp_connect(mode: str, *, force: bool = False) -> dict[str, Any]:
|
||
b = get_bridge()
|
||
b.connect(mode, force=force)
|
||
return b.status(mode)
|
||
|
||
|
||
def ctp_try_auto_reconnect(mode: str) -> bool:
|
||
"""断线时静默重连;已连接且 ping 正常则直接返回 True。"""
|
||
b = get_bridge()
|
||
if not b.available():
|
||
return False
|
||
st = _setting_for_mode(mode)
|
||
if not st.get("用户名") or not st.get("密码") or not st.get("交易服务器"):
|
||
return False
|
||
if b.connected_mode == mode and b.ping():
|
||
return True
|
||
try:
|
||
b.connect(mode, force=True)
|
||
return b.connected_mode == mode
|
||
except Exception as exc:
|
||
logger.info("CTP 自动重连失败: %s", exc)
|
||
return False
|
||
|
||
|
||
def ctp_status(mode: str) -> dict[str, Any]:
|
||
return get_bridge().status(mode)
|
||
|
||
|
||
def ctp_get_account(mode: str) -> dict[str, Any]:
|
||
b = get_bridge()
|
||
b.ensure_connected(mode)
|
||
return b.get_account()
|
||
|
||
|
||
def ctp_list_positions(mode: str) -> list[dict[str, Any]]:
|
||
b = get_bridge()
|
||
b.ensure_connected(mode)
|
||
return b.list_positions()
|
||
|
||
|
||
def get_ctp_balance(mode: str) -> Optional[float]:
|
||
try:
|
||
acc = ctp_get_account(mode)
|
||
bal = acc.get("balance")
|
||
return float(bal) if bal else None
|
||
except Exception as exc:
|
||
logger.debug("get_ctp_balance: %s", exc)
|
||
return None
|
||
|
||
|
||
def execute_order(
|
||
conn,
|
||
*,
|
||
mode: str,
|
||
offset: str,
|
||
symbol: str,
|
||
direction: str,
|
||
lots: int,
|
||
price: float,
|
||
settings: dict | None = None,
|
||
order_type: str = "limit",
|
||
) -> dict[str, Any]:
|
||
"""统一下单:simulation=SimNow,live=期货公司 CTP。"""
|
||
del conn, settings
|
||
if mode not in ("simulation", "live"):
|
||
raise ValueError("未知交易模式")
|
||
if not vnpy_available():
|
||
raise ValueError(
|
||
"请先安装 vnpy 与 vnpy_ctp:pip install vnpy vnpy_ctp\n"
|
||
f"模拟盘需配置 .env 中 SIMNOW_USER / SIMNOW_PASSWORD 等"
|
||
)
|
||
b = get_bridge()
|
||
b.ensure_connected(mode)
|
||
order_id = b.send_order(
|
||
ths_code=symbol,
|
||
offset=offset,
|
||
direction=direction,
|
||
lots=lots,
|
||
price=price,
|
||
order_type=order_type,
|
||
)
|
||
return {
|
||
"order_id": order_id,
|
||
"mode": mode,
|
||
"mode_label": _mode_label(mode),
|
||
"symbol": symbol,
|
||
"lots": lots,
|
||
"price": price,
|
||
}
|