Files
qihuo/scripts/test_simnow.py
T
2026-06-24 11:42:15 +08:00

77 lines
2.2 KiB
Python

#!/usr/bin/env python3
"""SimNow CTP 连接诊断(在服务器 venv 中运行)。"""
from __future__ import annotations
import os
import socket
import sys
BASE = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, BASE)
from locale_fix import ensure_process_locale, missing_ctp_locales
ensure_process_locale()
from dotenv import load_dotenv
load_dotenv(os.path.join(BASE, ".env"))
def _probe(host_port: str) -> str:
s = host_port.replace("tcp://", "").strip()
if ":" not in s:
return "invalid"
host, port_s = s.rsplit(":", 1)
try:
port = int(port_s)
sock = socket.create_connection((host, port), timeout=5)
sock.close()
return "ok"
except OSError as exc:
return str(exc)
def main() -> int:
user = os.getenv("SIMNOW_USER", "")
td = os.getenv("SIMNOW_TD_ADDRESS", "tcp://180.168.146.187:10201")
md = os.getenv("SIMNOW_MD_ADDRESS", "tcp://180.168.146.187:10211")
env = os.getenv("SIMNOW_ENV", "实盘")
print("=== SimNow 配置 ===")
print(f"locale = {ensure_process_locale()}")
missing = missing_ctp_locales()
if missing:
print(f"警告: 缺少 CTP 所需 locale: {', '.join(missing)}")
print(f"SIMNOW_USER = {user or '(未设置)'}")
print(f"SIMNOW_PASSWORD = {'*' * 8 if os.getenv('SIMNOW_PASSWORD') else '(未设置)'}")
print(f"SIMNOW_TD = {td}")
print(f"SIMNOW_MD = {md}")
print(f"SIMNOW_ENV = {env}")
print()
print("=== 端口探测 ===")
print(f"TD {td} -> {_probe(td)}")
print(f"MD {md} -> {_probe(md)}")
print()
if not user or not os.getenv("SIMNOW_PASSWORD"):
print("错误:请在 .env 填写 SIMNOW_USER / SIMNOW_PASSWORD")
return 1
print("=== CTP 登录测试 ===")
try:
from vnpy_bridge import ctp_connect
st = ctp_connect("simulation", force=True)
acc = st.get("broker_id")
print("连接成功")
print(st)
return 0
except Exception as exc:
print(f"连接失败: {exc}")
return 2
if __name__ == "__main__":
raise SystemExit(main())