Files
dekun e5a586f903 Restructure into modules/ with single-process CTP and config/ layout.
Move business code under modules/, env template to config/, PM2 single qihuo process, and _legacy shims for old imports.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-07-01 14:42:16 +08:00

249 lines
7.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Copyright (c) 2025-2026 马建军. All rights reserved.
# 专有软件 — 未经授权禁止复制、传播、转售。
# 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。
# 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md
"""
行情拉取:默认新浪(免费,普通用户可用)。
同花顺 iFinD HTTP 仅面向机构用户,需单独申请 token,可选开启。
"""
import os
import time
import json
import logging
from typing import Optional
import requests
logger = logging.getLogger(__name__)
THS_TOKEN_URL = "https://quantapi.51ifind.com/api/v1/get_access_token"
THS_QUOTE_URL = "https://quantapi.51ifind.com/api/v1/real_time_quotation"
# iFinD HTTP 期货交易所后缀
THS_EX_SUFFIX = {
"SHFE": "SHFE",
"DCE": "DCE",
"CZCE": "CZCE",
"CFFEX": "CFFEX",
"INE": "INE",
}
_token_cache: dict = {"token": "", "expires": 0.0, "refresh": ""}
def _quote_source() -> str:
return os.getenv("QUOTE_SOURCE", "sina").strip().lower()
def _has_ths_token() -> bool:
return bool(_get_refresh_token())
def get_quote_source_label(*, ctp_connected: bool = False) -> str:
"""界面展示用行情源说明。"""
if ctp_connected:
return "CTP 柜台(已连接)"
source = _quote_source()
if source == "sina":
return "新浪(CTP 未连接时备用)"
if source == "ths":
return "同花顺 iFinD" if _has_ths_token() else "同花顺(未配置 token"
if _has_ths_token():
return "同花顺优先,失败回退新浪"
return "新浪(CTP 未连接时备用)"
def _sina_headers() -> dict:
return {"Referer": "https://finance.sina.com.cn"}
def _parse_sina_futures_quote(parts: list) -> Optional[dict]:
"""解析新浪 nf_/CFF_RE_ 期货行情字段。"""
if len(parts) < 9:
return None
price = None
for idx in (8, 7, 6, 5):
if len(parts) > idx and parts[idx]:
try:
val = float(parts[idx])
if val > 0:
price = val
break
except ValueError:
pass
if price is None:
price = 0.0
open_interest = 0.0
volume = 0.0
if len(parts) > 13 and parts[13]:
try:
open_interest = float(parts[13])
except ValueError:
pass
if len(parts) > 14 and parts[14]:
try:
volume = float(parts[14])
except ValueError:
pass
prev_close = None
if len(parts) > 9 and parts[9]:
try:
prev_close = float(parts[9])
except ValueError:
pass
return {
"name": parts[0],
"price": price,
"volume": volume,
"open_interest": open_interest,
"prev_close": prev_close,
}
def _fetch_sina_raw(sina_code: str) -> Optional[dict]:
try:
url = f"https://hq.sinajs.cn/list={sina_code}"
resp = requests.get(url, headers=_sina_headers(), timeout=5)
resp.encoding = "gbk"
if '"' not in resp.text:
return None
body = resp.text.split('"')[1]
if not body:
return None
parts = body.split(",")
return _parse_sina_futures_quote(parts)
except Exception as exc:
logger.debug("sina fetch failed %s: %s", sina_code, exc)
return None
def get_sina_price(sina_code: str) -> Optional[float]:
raw = _fetch_sina_raw(sina_code)
return raw["price"] if raw else None
_runtime_refresh_token: str = ""
def set_ths_refresh_token(token: str):
global _runtime_refresh_token
_runtime_refresh_token = (token or "").strip()
def _get_refresh_token() -> str:
if _runtime_refresh_token:
return _runtime_refresh_token
return os.getenv("THS_REFRESH_TOKEN", "").strip()
def _get_ths_access_token(refresh_token: str) -> Optional[str]:
if not refresh_token:
return None
now = time.time()
if (
_token_cache["token"]
and _token_cache["refresh"] == refresh_token
and now < _token_cache["expires"]
):
return _token_cache["token"]
try:
resp = requests.post(
THS_TOKEN_URL,
headers={"Content-Type": "application/json", "refresh_token": refresh_token},
timeout=10,
)
data = resp.json()
if data.get("errorcode") != 0:
logger.warning("THS token error: %s", data.get("errmsg"))
return None
access = data["data"]["access_token"]
_token_cache.update({
"token": access,
"refresh": refresh_token,
"expires": now + 3600 * 6,
})
return access
except Exception as exc:
logger.warning("THS token request failed: %s", exc)
return None
def _parse_ths_quote(data: dict) -> Optional[float]:
"""从同花顺实时行情响应解析最新价。"""
try:
tables = data.get("tables") or []
for table in tables:
t = table.get("table") or {}
for key in ("latest", "new", "close", "trade", "last"):
val = t.get(key)
if val is None:
continue
if isinstance(val, list) and val:
return float(val[0])
if isinstance(val, (int, float, str)) and str(val):
return float(val)
# 部分响应嵌套在 data 字段
if "data" in data and isinstance(data["data"], dict):
return _parse_ths_quote(data["data"])
except Exception as exc:
logger.debug("parse ths quote failed: %s", exc)
return None
def get_ths_price(ths_full_code: str, refresh_token: str = "") -> Optional[float]:
"""ths_full_code 如 ag2608.SHFE、IF2606.CFFEX"""
token = refresh_token or _get_refresh_token()
access = _get_ths_access_token(token)
if not access:
return None
try:
resp = requests.post(
THS_QUOTE_URL,
headers={"Content-Type": "application/json", "access_token": access},
json={"codes": ths_full_code, "indicators": "latest"},
timeout=10,
)
data = resp.json()
if data.get("errorcode") != 0:
logger.warning("THS quote error %s: %s", ths_full_code, data.get("errmsg"))
return None
return _parse_ths_quote(data)
except Exception as exc:
logger.warning("THS quote failed %s: %s", ths_full_code, exc)
return None
def get_price(market_code: str, sina_fallback: str = "") -> Optional[float]:
"""
统一取价入口。
sina_fallback: 新浪代码 nf_AG2608(普通用户默认使用)
market_code: 同花顺完整代码 ag2608.SHFE(仅机构 token 可用时)
"""
source = _quote_source()
# 仅在有 token 且配置为 ths/auto 时才尝试同花顺
use_ths = source == "ths" or (source == "auto" and _has_ths_token())
if use_ths and market_code and "." in market_code:
price = get_ths_price(market_code)
if price is not None:
return price
if source == "ths":
return None
if sina_fallback:
return get_sina_price(sina_fallback)
if market_code.startswith("nf_") or market_code.startswith("CFF_RE_"):
return get_sina_price(market_code)
return None
def fetch_raw_for_volume(sina_code: str) -> Optional[dict]:
"""主力合约扫描用(成交量),走新浪。"""
return _fetch_sina_raw(sina_code)