fix: CTP 连接捕获 SimNow 真实报错并增加诊断脚本
显式设置柜台环境=实盘;连接失败时解析 4097/登录拒单;scripts/test_simnow.py 供服务器排查。 Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -0,0 +1,68 @@
|
||||
#!/usr/bin/env python3
|
||||
"""SimNow CTP 连接诊断(在服务器 venv 中运行)。"""
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import socket
|
||||
import sys
|
||||
|
||||
BASE = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
sys.path.insert(0, BASE)
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(os.path.join(BASE, ".env"))
|
||||
|
||||
|
||||
def _probe(host_port: str) -> str:
|
||||
s = host_port.replace("tcp://", "").strip()
|
||||
if ":" not in s:
|
||||
return "invalid"
|
||||
host, port_s = s.rsplit(":", 1)
|
||||
try:
|
||||
port = int(port_s)
|
||||
sock = socket.create_connection((host, port), timeout=5)
|
||||
sock.close()
|
||||
return "ok"
|
||||
except OSError as exc:
|
||||
return str(exc)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
user = os.getenv("SIMNOW_USER", "")
|
||||
td = os.getenv("SIMNOW_TD_ADDRESS", "tcp://180.168.146.187:10201")
|
||||
md = os.getenv("SIMNOW_MD_ADDRESS", "tcp://180.168.146.187:10211")
|
||||
env = os.getenv("SIMNOW_ENV", "实盘")
|
||||
|
||||
print("=== SimNow 配置 ===")
|
||||
print(f"SIMNOW_USER = {user or '(未设置)'}")
|
||||
print(f"SIMNOW_PASSWORD = {'*' * 8 if os.getenv('SIMNOW_PASSWORD') else '(未设置)'}")
|
||||
print(f"SIMNOW_TD = {td}")
|
||||
print(f"SIMNOW_MD = {md}")
|
||||
print(f"SIMNOW_ENV = {env}")
|
||||
print()
|
||||
print("=== 端口探测 ===")
|
||||
print(f"TD {td} -> {_probe(td)}")
|
||||
print(f"MD {md} -> {_probe(md)}")
|
||||
print()
|
||||
|
||||
if not user or not os.getenv("SIMNOW_PASSWORD"):
|
||||
print("错误:请在 .env 填写 SIMNOW_USER / SIMNOW_PASSWORD")
|
||||
return 1
|
||||
|
||||
print("=== CTP 登录测试 ===")
|
||||
try:
|
||||
from vnpy_bridge import ctp_connect
|
||||
|
||||
st = ctp_connect("simulation", force=True)
|
||||
acc = st.get("broker_id")
|
||||
print("连接成功")
|
||||
print(st)
|
||||
return 0
|
||||
except Exception as exc:
|
||||
print(f"连接失败: {exc}")
|
||||
return 2
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user