+66
@@ -5,6 +5,7 @@
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from datetime import date
|
||||
from typing import Optional
|
||||
@@ -64,6 +65,7 @@ PRODUCTS = [
|
||||
{"name": "中证1000", "ths": "IM", "sina": "IM", "exchange": "中金所", "ex": "CFFEX"},
|
||||
]
|
||||
|
||||
EXCHANGE_ORDER = ["上期所", "上期能源", "大商所", "郑商所", "中金所"]
|
||||
_MAIN_CACHE: dict[str, tuple[float, dict]] = {}
|
||||
_CACHE_TTL = 300
|
||||
_main_index_lock = threading.Lock()
|
||||
@@ -167,6 +169,47 @@ def ths_to_sina_code(ths_code: str) -> Optional[str]:
|
||||
return codes["sina_code"] if codes else None
|
||||
|
||||
|
||||
def parse_contract_year_month(ths_code: str) -> Optional[tuple[int, int]]:
|
||||
"""从同花顺合约代码解析交割年月。"""
|
||||
code = (ths_code or "").strip()
|
||||
if not code or "888" in code:
|
||||
return None
|
||||
m4 = re.match(r"^([A-Za-z]+)(\d{4})$", code)
|
||||
if m4:
|
||||
digits = m4.group(2)
|
||||
year = 2000 + int(digits[:2])
|
||||
month = int(digits[2:])
|
||||
if 1 <= month <= 12:
|
||||
return year, month
|
||||
m3 = re.match(r"^([A-Za-z]+)(\d{3})$", code)
|
||||
if m3:
|
||||
letters, digits = m3.group(1), m3.group(2)
|
||||
month = int(digits[1:])
|
||||
if not 1 <= month <= 12:
|
||||
return None
|
||||
y_digit = int(digits[0])
|
||||
year = date.today().year
|
||||
decade = year // 10 * 10
|
||||
candidate = decade + y_digit
|
||||
if candidate < year - 1:
|
||||
candidate += 10
|
||||
product = _find_product_by_letters(letters)
|
||||
if product:
|
||||
return candidate, month
|
||||
return None
|
||||
|
||||
|
||||
def is_near_expiry_main(ths_code: str) -> bool:
|
||||
"""主力合约交割月为当月或下月时视为临期。"""
|
||||
ym = parse_contract_year_month(ths_code)
|
||||
if not ym:
|
||||
return False
|
||||
cy, cm = ym
|
||||
today = date.today()
|
||||
months_ahead = (cy - today.year) * 12 + (cm - today.month)
|
||||
return months_ahead <= 1
|
||||
|
||||
|
||||
def _make_symbol_item(product: dict, year: int, month: int, volume: float) -> dict:
|
||||
ths = build_ths_code(product, year, month)
|
||||
name = product["name"]
|
||||
@@ -238,6 +281,7 @@ def _enrich_item(item: dict) -> dict:
|
||||
out = dict(item)
|
||||
if not out.get("input_label"):
|
||||
out["input_label"] = f"{out.get('name', '')} {out.get('ths_code', '')}".strip()
|
||||
out["near_expiry"] = is_near_expiry_main(out.get("ths_code", ""))
|
||||
return out
|
||||
|
||||
|
||||
@@ -351,6 +395,28 @@ def search_symbols(query: str) -> list:
|
||||
return results
|
||||
|
||||
|
||||
def list_main_contracts_grouped() -> list[dict]:
|
||||
"""按交易所分类返回全部品种主力合约(行情页下拉用)。"""
|
||||
with _main_index_lock:
|
||||
index = dict(_main_index)
|
||||
index_ready = bool(index)
|
||||
|
||||
buckets: dict[str, list] = defaultdict(list)
|
||||
for p in PRODUCTS:
|
||||
main = index.get(p["sina"])
|
||||
if not main and not index_ready:
|
||||
main = _stub_main_contract(p)
|
||||
if main:
|
||||
buckets[p["exchange"]].append(_enrich_item(main))
|
||||
|
||||
groups: list[dict] = []
|
||||
for cat in EXCHANGE_ORDER:
|
||||
items = buckets.get(cat)
|
||||
if items:
|
||||
groups.append({"category": cat, "items": items})
|
||||
return groups
|
||||
|
||||
|
||||
_start_warm_thread()
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user