b77f30b3ff
Co-authored-by: Cursor <cursoragent@cursor.com>
276 lines
8.2 KiB
Python
276 lines
8.2 KiB
Python
"""期货合约简介:东方财富 / 新浪 / AKShare。"""
|
|
import logging
|
|
import re
|
|
from typing import Any, Optional
|
|
|
|
import requests
|
|
|
|
from contract_specs import get_contract_spec
|
|
from symbols import ths_to_codes, search_symbols
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
EM_LABEL_MAP = {
|
|
"vname": "交易品种",
|
|
"vcode": "交易代码",
|
|
"jydw": "交易单位",
|
|
"bjdw": "报价单位",
|
|
"market": "交易所",
|
|
"zxbddw": "最小变动价位",
|
|
"zdtbfd": "涨跌停幅度",
|
|
"hyjgyf": "合约月份",
|
|
"jysj": "交易时间",
|
|
"zhjyr": "最后交易日",
|
|
"zhjgr": "交割日期",
|
|
"jgpj": "交割品级",
|
|
"zcjybzj": "最低交易保证金",
|
|
"jgfs": "交割方式",
|
|
"jgdd": "交割地点",
|
|
"ssrq": "上市日期",
|
|
}
|
|
|
|
DISPLAY_ORDER = [
|
|
"交易品种",
|
|
"交易代码",
|
|
"交易单位",
|
|
"报价单位",
|
|
"最小变动价位",
|
|
"最低交易保证金",
|
|
"涨跌停幅度",
|
|
"合约月份",
|
|
"交易时间",
|
|
"最后交易日",
|
|
"交割日期",
|
|
"交割方式",
|
|
"交割地点",
|
|
"交割品级",
|
|
"上市日期",
|
|
"交易所",
|
|
]
|
|
|
|
SKIP_ITEMS = {"", "-", "None", "nan", "null"}
|
|
|
|
|
|
def _normalize_ths_code(raw: str) -> Optional[str]:
|
|
code = (raw or "").strip()
|
|
if not code:
|
|
return None
|
|
# 已是完整合约
|
|
if re.match(r"^[A-Za-z]+\d{3,4}$", code):
|
|
return code
|
|
# 仅品种字母时尝试匹配主力
|
|
results = search_symbols(code)
|
|
if results:
|
|
return results[0].get("ths_code") or code
|
|
codes = ths_to_codes(code)
|
|
if codes:
|
|
return codes["ths_code"]
|
|
return code
|
|
|
|
|
|
def _to_sina_quote_symbol(ths_code: str) -> str:
|
|
m = re.match(r"^([A-Za-z]+)(\d+)$", ths_code.strip())
|
|
if not m:
|
|
return ths_code.upper()
|
|
return m.group(1).upper() + m.group(2)
|
|
|
|
|
|
def _to_em_page_symbol(ths_code: str) -> str:
|
|
return ths_code.strip().lower() + "F"
|
|
|
|
|
|
def _clean_value(val: Any) -> str:
|
|
if val is None:
|
|
return ""
|
|
s = str(val).strip()
|
|
if s in SKIP_ITEMS:
|
|
return ""
|
|
return s
|
|
|
|
|
|
def _rows_from_dict(data: dict[str, str]) -> list[dict]:
|
|
rows: list[dict] = []
|
|
seen: set[str] = set()
|
|
for label in DISPLAY_ORDER:
|
|
val = _clean_value(data.get(label))
|
|
if not val:
|
|
continue
|
|
hint = _clean_value(data.get(f"{label}_hint"))
|
|
rows.append({"label": label, "value": val, "hint": hint})
|
|
seen.add(label)
|
|
for label, val in data.items():
|
|
if label.endswith("_hint") or label in seen:
|
|
continue
|
|
val = _clean_value(val)
|
|
if val:
|
|
rows.append({"label": label, "value": val, "hint": ""})
|
|
return rows
|
|
|
|
|
|
def _add_computed_hints(ths_code: str, data: dict[str, str]) -> None:
|
|
spec = get_contract_spec(ths_code)
|
|
mult = spec.get("mult") or 0
|
|
tick_raw = data.get("最小变动价位", "")
|
|
m = re.search(r"([\d.]+)", tick_raw)
|
|
if m and mult:
|
|
tick = float(m.group(1))
|
|
data["最小变动价位_hint"] = f"一手合约最小波动{round(tick * mult, 2)}元"
|
|
|
|
|
|
def _fetch_em_direct(em_symbol: str) -> dict[str, str]:
|
|
page_url = f"https://quote.eastmoney.com/qihuo/{em_symbol}.html"
|
|
r = requests.get(page_url, timeout=12)
|
|
r.encoding = r.apparent_encoding or "utf-8"
|
|
inner = None
|
|
for pat in [
|
|
r"futures_([A-Za-z0-9_]+)",
|
|
r"#(futures_[A-Za-z0-9_]+)",
|
|
r"/(futures_[A-Za-z0-9_]+)",
|
|
]:
|
|
m = re.search(pat, r.text)
|
|
if m:
|
|
inner = m.group(1).replace("futures_", "")
|
|
break
|
|
if not inner:
|
|
raise ValueError("无法解析东方财富合约标识")
|
|
|
|
info_url = f"https://futsse-static.eastmoney.com/redis?msgid={inner}_info"
|
|
r2 = requests.get(info_url, timeout=12)
|
|
payload = r2.json()
|
|
if not isinstance(payload, dict):
|
|
raise ValueError("东方财富返回数据无效")
|
|
|
|
out: dict[str, str] = {}
|
|
for key, label in EM_LABEL_MAP.items():
|
|
val = _clean_value(payload.get(key))
|
|
if val:
|
|
out[label] = val
|
|
if not out:
|
|
raise ValueError("东方财富合约字段为空")
|
|
return out
|
|
|
|
|
|
def _fetch_em_akshare(em_symbol: str) -> dict[str, str]:
|
|
import akshare as ak
|
|
|
|
df = ak.futures_contract_detail_em(symbol=em_symbol)
|
|
out: dict[str, str] = {}
|
|
for _, row in df.iterrows():
|
|
label = _clean_value(row.get("item"))
|
|
val = _clean_value(row.get("value"))
|
|
if label and val:
|
|
if label == "跌涨停板幅度":
|
|
label = "涨跌停幅度"
|
|
if label == "最后交割日":
|
|
label = "交割日期"
|
|
if label == "上市交易所":
|
|
label = "交易所"
|
|
if label == "合约交割月份":
|
|
label = "合约月份"
|
|
if label == "最初交易保证金":
|
|
label = "最低交易保证金"
|
|
if label == "最小变动价格":
|
|
label = "最小变动价位"
|
|
out[label] = val
|
|
return out
|
|
|
|
|
|
def _fetch_sina_direct(sina_symbol: str) -> dict[str, str]:
|
|
from io import StringIO
|
|
|
|
import pandas as pd
|
|
|
|
url = f"https://finance.sina.com.cn/futures/quotes/{sina_symbol}.shtml"
|
|
r = requests.get(url, timeout=12, headers={"Referer": "https://finance.sina.com.cn/"})
|
|
r.encoding = "gb2312"
|
|
tables = pd.read_html(StringIO(r.text))
|
|
if len(tables) < 7:
|
|
raise ValueError("新浪页面结构变化")
|
|
temp_df = tables[6]
|
|
parts = []
|
|
for ncol in [slice(0, 2), slice(2, 4), slice(4, None)]:
|
|
part = temp_df.iloc[:, ncol]
|
|
part.columns = ["item", "value"]
|
|
parts.append(part)
|
|
merged = pd.concat(parts, axis=0, ignore_index=True)
|
|
out: dict[str, str] = {}
|
|
for _, row in merged.iterrows():
|
|
label = _clean_value(row["item"])
|
|
val = _clean_value(row["value"])
|
|
if not label or not val or len(label) > 80 or "发帖" in val:
|
|
continue
|
|
out[label] = val
|
|
return out
|
|
|
|
|
|
def _fetch_sina_akshare(sina_symbol: str) -> dict[str, str]:
|
|
import akshare as ak
|
|
|
|
df = ak.futures_contract_detail(symbol=sina_symbol)
|
|
out: dict[str, str] = {}
|
|
for _, row in df.iterrows():
|
|
label = _clean_value(row.get("item"))
|
|
val = _clean_value(row.get("value"))
|
|
if label and val and "发帖" not in val:
|
|
out[label] = val
|
|
return out
|
|
|
|
|
|
def _merge_profile(primary: dict[str, str], secondary: dict[str, str]) -> dict[str, str]:
|
|
merged = dict(secondary)
|
|
merged.update(primary)
|
|
return merged
|
|
|
|
|
|
def get_contract_profile(raw_symbol: str) -> Optional[dict]:
|
|
ths_code = _normalize_ths_code(raw_symbol)
|
|
if not ths_code:
|
|
return None
|
|
|
|
em_symbol = _to_em_page_symbol(ths_code)
|
|
sina_symbol = _to_sina_quote_symbol(ths_code)
|
|
data: dict[str, str] = {}
|
|
source_parts: list[str] = []
|
|
|
|
# 东方财富(字段与看盘软件简介接近)
|
|
try:
|
|
try:
|
|
data = _fetch_em_akshare(em_symbol)
|
|
source_parts.append("东方财富")
|
|
except ImportError:
|
|
data = _fetch_em_direct(em_symbol)
|
|
source_parts.append("东方财富")
|
|
except Exception as exc:
|
|
logger.warning("eastmoney profile failed %s: %s", em_symbol, exc)
|
|
|
|
# 新浪补充交割地点、上市日期等
|
|
sina_data: dict[str, str] = {}
|
|
try:
|
|
try:
|
|
sina_data = _fetch_sina_akshare(sina_symbol)
|
|
except ImportError:
|
|
sina_data = _fetch_sina_direct(sina_symbol)
|
|
if sina_data:
|
|
source_parts.append("新浪")
|
|
except Exception as exc:
|
|
logger.warning("sina profile failed %s: %s", sina_symbol, exc)
|
|
|
|
if sina_data:
|
|
data = _merge_profile(data, sina_data)
|
|
|
|
if not data:
|
|
return None
|
|
|
|
_add_computed_hints(ths_code, data)
|
|
rows = _rows_from_dict(data)
|
|
if not rows:
|
|
return None
|
|
|
|
return {
|
|
"ths_code": ths_code,
|
|
"symbol_name": data.get("交易品种", ""),
|
|
"exchange": data.get("交易所", ""),
|
|
"rows": rows,
|
|
"source": " + ".join(source_parts) if source_parts else "未知",
|
|
}
|