Files
qihuo/modules/ctp/ctp_symbol.py
T
dekun e5a586f903 Restructure into modules/ with single-process CTP and config/ layout.
Move business code under modules/, env template to config/, PM2 single qihuo process, and _legacy shims for old imports.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-07-01 14:42:16 +08:00

67 lines
2.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Copyright (c) 2025-2026 马建军. All rights reserved.
# 专有软件 — 未经授权禁止复制、传播、转售。
# 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。
# 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md
"""同花顺合约代码 → vnpy Symbol + Exchange。"""
from __future__ import annotations
import re
from typing import Optional, Tuple
from modules.core.symbols import ths_to_codes
try:
from vnpy.trader.constant import Exchange
except ImportError:
Exchange = None # type: ignore
_EX_MAP = {
"SHFE": "SHFE",
"DCE": "DCE",
"CZCE": "CZCE",
"CFFEX": "CFFEX",
"INE": "INE",
}
def ths_to_vnpy_symbol(ths_code: str) -> Tuple[str, str]:
"""
返回 (symbol, exchange_enum_name)。
例:rb2610 → rb2610, SHFESR609 → SR609, CZCE
"""
code = (ths_code or "").strip()
codes = ths_to_codes(code)
ex = (codes.get("ex") if codes else None)
if not ex and codes:
mc = (codes.get("market_code") or "")
if "." in mc:
ex = mc.rsplit(".", 1)[-1]
ex = _EX_MAP.get(ex or "SHFE", "SHFE")
m = re.match(r"^([A-Za-z]+)(\d+)$", code)
if not m:
return code, ex
letters, digits = m.group(1), m.group(2)
if ex == "CZCE":
# 郑商所 CTP 常为大写 + 3 位年月(如 SR509);4 位则取后 3 位
sym = letters.upper() + (digits[-3:] if len(digits) >= 3 else digits)
else:
sym = letters.lower() + digits
return sym, ex
def to_vnpy_exchange(ex_name: str):
if Exchange is None:
raise ImportError("vnpy 未安装")
mapping = {
"SHFE": Exchange.SHFE,
"DCE": Exchange.DCE,
"CZCE": Exchange.CZCE,
"CFFEX": Exchange.CFFEX,
"INE": Exchange.INE,
}
ex = mapping.get((ex_name or "").upper())
if ex is None:
raise ValueError(f"未知交易所: {ex_name}")
return ex