6e423eebfb
新增 vnpy CTP 桥接、以损定仓/固定张数、趋势回调与滚仓策略、按资金推荐品种及交易风控;模拟盘走 SimNow,实盘预留期货公司配置。 Co-authored-by: Cursor <cursoragent@cursor.com>
58 lines
1.5 KiB
Python
58 lines
1.5 KiB
Python
"""同花顺合约代码 → vnpy Symbol + Exchange。"""
|
||
from __future__ import annotations
|
||
|
||
import re
|
||
from typing import Optional, Tuple
|
||
|
||
from symbols import ths_to_codes
|
||
|
||
try:
|
||
from vnpy.trader.constant import Exchange
|
||
except ImportError:
|
||
Exchange = None # type: ignore
|
||
|
||
_EX_MAP = {
|
||
"SHFE": "SHFE",
|
||
"DCE": "DCE",
|
||
"CZCE": "CZCE",
|
||
"CFFEX": "CFFEX",
|
||
"INE": "INE",
|
||
}
|
||
|
||
|
||
def ths_to_vnpy_symbol(ths_code: str) -> Tuple[str, str]:
|
||
"""
|
||
返回 (symbol, exchange_enum_name)。
|
||
例:rb2610 → rb2610, SHFE;SR609 → SR609, CZCE
|
||
"""
|
||
code = (ths_code or "").strip()
|
||
codes = ths_to_codes(code)
|
||
ex = (codes.get("ex") if codes else None) or "SHFE"
|
||
ex = _EX_MAP.get(ex, "SHFE")
|
||
m = re.match(r"^([A-Za-z]+)(\d+)$", code)
|
||
if not m:
|
||
return code, ex
|
||
letters, digits = m.group(1), m.group(2)
|
||
if ex == "CZCE":
|
||
# 郑商所 CTP 常为大写 + 3 位年月(如 SR509);4 位则取后 3 位
|
||
sym = letters.upper() + (digits[-3:] if len(digits) >= 3 else digits)
|
||
else:
|
||
sym = letters.lower() + digits
|
||
return sym, ex
|
||
|
||
|
||
def to_vnpy_exchange(ex_name: str):
|
||
if Exchange is None:
|
||
raise ImportError("vnpy 未安装")
|
||
mapping = {
|
||
"SHFE": Exchange.SHFE,
|
||
"DCE": Exchange.DCE,
|
||
"CZCE": Exchange.CZCE,
|
||
"CFFEX": Exchange.CFFEX,
|
||
"INE": Exchange.INE,
|
||
}
|
||
ex = mapping.get((ex_name or "").upper())
|
||
if ex is None:
|
||
raise ValueError(f"未知交易所: {ex_name}")
|
||
return ex
|