5a6c89c662
Co-authored-by: Cursor <cursoragent@cursor.com>
80 lines
2.4 KiB
Python
80 lines
2.4 KiB
Python
#!/usr/bin/env python3
|
|
"""SimNow CTP 连接诊断(在服务器 venv 中运行)。"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import socket
|
|
import sys
|
|
|
|
BASE = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
sys.path.insert(0, BASE)
|
|
|
|
from locale_fix import ensure_process_locale, missing_ctp_locales
|
|
|
|
ensure_process_locale()
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
load_dotenv(os.path.join(BASE, ".env"))
|
|
|
|
|
|
def _probe(host_port: str) -> str:
|
|
s = host_port.replace("tcp://", "").strip()
|
|
if ":" not in s:
|
|
return "invalid"
|
|
host, port_s = s.rsplit(":", 1)
|
|
try:
|
|
port = int(port_s)
|
|
sock = socket.create_connection((host, port), timeout=5)
|
|
sock.close()
|
|
return "ok"
|
|
except OSError as exc:
|
|
return str(exc)
|
|
|
|
|
|
def main() -> int:
|
|
from ctp_settings import resolve_ctp_value
|
|
|
|
user = resolve_ctp_value("simnow_user", "SIMNOW_USER")
|
|
pwd = resolve_ctp_value("simnow_password", "SIMNOW_PASSWORD")
|
|
td = resolve_ctp_value("simnow_td_address", "SIMNOW_TD_ADDRESS", "tcp://180.168.146.187:10201")
|
|
md = resolve_ctp_value("simnow_md_address", "SIMNOW_MD_ADDRESS", "tcp://180.168.146.187:10211")
|
|
env = resolve_ctp_value("simnow_env", "SIMNOW_ENV", "实盘")
|
|
|
|
print("=== SimNow 配置(系统设置优先)===")
|
|
print(f"locale = {ensure_process_locale()}")
|
|
missing = missing_ctp_locales()
|
|
if missing:
|
|
print(f"警告: 缺少 CTP 所需 locale: {', '.join(missing)}")
|
|
print(f"SIMNOW_USER = {user or '(未设置)'}")
|
|
print(f"SIMNOW_PASSWORD = {'*' * 8 if pwd else '(未设置)'}")
|
|
print(f"SIMNOW_TD = {td}")
|
|
print(f"SIMNOW_MD = {md}")
|
|
print(f"SIMNOW_ENV = {env}")
|
|
print()
|
|
print("=== 端口探测 ===")
|
|
print(f"TD {td} -> {_probe(td)}")
|
|
print(f"MD {md} -> {_probe(md)}")
|
|
print()
|
|
|
|
if not user or not pwd:
|
|
print("错误:请在系统设置或 .env 填写 SimNow 投资者代码与密码")
|
|
return 1
|
|
|
|
print("=== CTP 登录测试 ===")
|
|
try:
|
|
from vnpy_bridge import ctp_connect
|
|
|
|
st = ctp_connect("simulation", force=True)
|
|
acc = st.get("broker_id")
|
|
print("连接成功")
|
|
print(st)
|
|
return 0
|
|
except Exception as exc:
|
|
print(f"连接失败: {exc}")
|
|
return 2
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|