Files
qihuo/ctp_symbol.py
T
dekun 6e423eebfb 接入 SimNow 模拟盘与期货下单、策略及品种推荐功能。
新增 vnpy CTP 桥接、以损定仓/固定张数、趋势回调与滚仓策略、按资金推荐品种及交易风控;模拟盘走 SimNow,实盘预留期货公司配置。

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-24 10:04:37 +08:00

58 lines
1.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""同花顺合约代码 → vnpy Symbol + Exchange。"""
from __future__ import annotations
import re
from typing import Optional, Tuple
from symbols import ths_to_codes
try:
from vnpy.trader.constant import Exchange
except ImportError:
Exchange = None # type: ignore
_EX_MAP = {
"SHFE": "SHFE",
"DCE": "DCE",
"CZCE": "CZCE",
"CFFEX": "CFFEX",
"INE": "INE",
}
def ths_to_vnpy_symbol(ths_code: str) -> Tuple[str, str]:
"""
返回 (symbol, exchange_enum_name)。
例:rb2610 → rb2610, SHFESR609 → SR609, CZCE
"""
code = (ths_code or "").strip()
codes = ths_to_codes(code)
ex = (codes.get("ex") if codes else None) or "SHFE"
ex = _EX_MAP.get(ex, "SHFE")
m = re.match(r"^([A-Za-z]+)(\d+)$", code)
if not m:
return code, ex
letters, digits = m.group(1), m.group(2)
if ex == "CZCE":
# 郑商所 CTP 常为大写 + 3 位年月(如 SR509);4 位则取后 3 位
sym = letters.upper() + (digits[-3:] if len(digits) >= 3 else digits)
else:
sym = letters.lower() + digits
return sym, ex
def to_vnpy_exchange(ex_name: str):
if Exchange is None:
raise ImportError("vnpy 未安装")
mapping = {
"SHFE": Exchange.SHFE,
"DCE": Exchange.DCE,
"CZCE": Exchange.CZCE,
"CFFEX": Exchange.CFFEX,
"INE": Exchange.INE,
}
ex = mapping.get((ex_name or "").upper())
if ex is None:
raise ValueError(f"未知交易所: {ex_name}")
return ex