# Copyright (c) 2025-2026 马建军. All rights reserved. # 专有软件 — 未经授权禁止复制、传播、转售。 # 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。 # 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md """ 期货品种与同花顺代码映射。 展示同花顺合约代码(ag2608);行情默认新浪,机构用户可通过环境变量启用同花顺 iFinD。 """ import re import threading import time from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import date from typing import Optional from market import fetch_raw_for_volume, get_price as market_get_price, THS_EX_SUFFIX PRODUCTS = [ {"name": "白银", "ths": "ag", "sina": "AG", "exchange": "上期所", "ex": "SHFE"}, {"name": "黄金", "ths": "au", "sina": "AU", "exchange": "上期所", "ex": "SHFE"}, {"name": "铜", "ths": "cu", "sina": "CU", "exchange": "上期所", "ex": "SHFE"}, {"name": "铝", "ths": "al", "sina": "AL", "exchange": "上期所", "ex": "SHFE"}, {"name": "锌", "ths": "zn", "sina": "ZN", "exchange": "上期所", "ex": "SHFE"}, {"name": "铅", "ths": "pb", "sina": "PB", "exchange": "上期所", "ex": "SHFE"}, {"name": "镍", "ths": "ni", "sina": "NI", "exchange": "上期所", "ex": "SHFE"}, {"name": "锡", "ths": "sn", "sina": "SN", "exchange": "上期所", "ex": "SHFE"}, {"name": "螺纹钢", "ths": "rb", "sina": "RB", "exchange": "上期所", "ex": "SHFE"}, {"name": "热卷", "ths": "hc", "sina": "HC", "exchange": "上期所", "ex": "SHFE"}, {"name": "不锈钢", "ths": "ss", "sina": "SS", "exchange": "上期所", "ex": "SHFE"}, {"name": "原油", "ths": "sc", "sina": "SC", "exchange": "上期能源", "ex": "INE"}, {"name": "燃油", "ths": "fu", "sina": "FU", "exchange": "上期所", "ex": "SHFE"}, {"name": "沥青", "ths": "bu", "sina": "BU", "exchange": "上期所", "ex": "SHFE"}, {"name": "橡胶", "ths": "ru", "sina": "RU", "exchange": "上期所", "ex": "SHFE"}, {"name": "纸浆", "ths": "sp", "sina": "SP", "exchange": "上期所", "ex": "SHFE"}, {"name": "铁矿石", "ths": "i", "sina": "I", "exchange": "大商所", "ex": "DCE"}, {"name": "焦炭", "ths": "j", "sina": "J", "exchange": "大商所", "ex": "DCE"}, {"name": "焦煤", "ths": "jm", "sina": "JM", "exchange": "大商所", "ex": "DCE"}, {"name": "豆粕", "ths": "m", "sina": "M", "exchange": "大商所", "ex": "DCE"}, {"name": "豆油", "ths": "y", "sina": "Y", "exchange": "大商所", "ex": "DCE"}, {"name": "棕榈油", "ths": "p", "sina": "P", "exchange": "大商所", "ex": "DCE"}, {"name": "玉米", "ths": "c", "sina": "C", "exchange": "大商所", "ex": "DCE"}, {"name": "淀粉", "ths": "cs", "sina": "CS", "exchange": "大商所", "ex": "DCE"}, {"name": "鸡蛋", "ths": "jd", "sina": "JD", "exchange": "大商所", "ex": "DCE"}, {"name": "生猪", "ths": "lh", "sina": "LH", "exchange": "大商所", "ex": "DCE"}, {"name": "聚乙烯", "ths": "l", "sina": "L", "exchange": "大商所", "ex": "DCE"}, {"name": "聚丙烯", "ths": "pp", "sina": "PP", "exchange": "大商所", "ex": "DCE"}, {"name": "PVC", "ths": "v", "sina": "V", "exchange": "大商所", "ex": "DCE"}, {"name": "乙二醇", "ths": "eg", "sina": "EG", "exchange": "大商所", "ex": "DCE"}, {"name": "苯乙烯", "ths": "eb", "sina": "EB", "exchange": "大商所", "ex": "DCE"}, {"name": "液化气", "ths": "pg", "sina": "PG", "exchange": "大商所", "ex": "DCE"}, {"name": "菜粕", "ths": "RM", "sina": "RM", "exchange": "郑商所", "ex": "CZCE"}, {"name": "菜油", "ths": "OI", "sina": "OI", "exchange": "郑商所", "ex": "CZCE"}, {"name": "白糖", "ths": "SR", "sina": "SR", "exchange": "郑商所", "ex": "CZCE"}, {"name": "棉花", "ths": "CF", "sina": "CF", "exchange": "郑商所", "ex": "CZCE"}, {"name": "甲醇", "ths": "MA", "sina": "MA", "exchange": "郑商所", "ex": "CZCE"}, {"name": "PTA", "ths": "TA", "sina": "TA", "exchange": "郑商所", "ex": "CZCE"}, {"name": "玻璃", "ths": "FG", "sina": "FG", "exchange": "郑商所", "ex": "CZCE"}, {"name": "纯碱", "ths": "SA", "sina": "SA", "exchange": "郑商所", "ex": "CZCE"}, {"name": "尿素", "ths": "UR", "sina": "UR", "exchange": "郑商所", "ex": "CZCE"}, {"name": "硅铁", "ths": "SF", "sina": "SF", "exchange": "郑商所", "ex": "CZCE"}, {"name": "锰硅", "ths": "SM", "sina": "SM", "exchange": "郑商所", "ex": "CZCE"}, {"name": "苹果", "ths": "AP", "sina": "AP", "exchange": "郑商所", "ex": "CZCE"}, {"name": "红枣", "ths": "CJ", "sina": "CJ", "exchange": "郑商所", "ex": "CZCE"}, {"name": "花生", "ths": "PK", "sina": "PK", "exchange": "郑商所", "ex": "CZCE"}, {"name": "沪深300", "ths": "IF", "sina": "IF", "exchange": "中金所", "ex": "CFFEX"}, {"name": "上证50", "ths": "IH", "sina": "IH", "exchange": "中金所", "ex": "CFFEX"}, {"name": "中证500", "ths": "IC", "sina": "IC", "exchange": "中金所", "ex": "CFFEX"}, {"name": "中证1000", "ths": "IM", "sina": "IM", "exchange": "中金所", "ex": "CFFEX"}, ] PRODUCT_CATEGORY_MAP = { "ag": "贵金属", "au": "贵金属", "cu": "有色金属", "al": "有色金属", "zn": "有色金属", "pb": "有色金属", "ni": "有色金属", "sn": "有色金属", "rb": "黑色金属", "hc": "黑色金属", "ss": "黑色金属", "i": "黑色金属", "j": "黑色金属", "jm": "黑色金属", "SF": "黑色金属", "SM": "黑色金属", "sc": "能源化工", "fu": "能源化工", "bu": "能源化工", "ru": "能源化工", "sp": "能源化工", "l": "能源化工", "pp": "能源化工", "v": "能源化工", "eg": "能源化工", "eb": "能源化工", "pg": "能源化工", "MA": "能源化工", "TA": "能源化工", "SA": "能源化工", "UR": "能源化工", "FG": "能源化工", "m": "农产品", "y": "农产品", "p": "农产品", "c": "农产品", "cs": "农产品", "jd": "农产品", "lh": "农产品", "RM": "农产品", "OI": "农产品", "SR": "农产品", "CF": "农产品", "AP": "农产品", "CJ": "农产品", "PK": "农产品", "IF": "金融期货", "IH": "金融期货", "IC": "金融期货", "IM": "金融期货", } PRODUCT_CATEGORIES = ["贵金属", "有色金属", "黑色金属", "能源化工", "农产品", "金融期货"] for _p in PRODUCTS: _p["category"] = PRODUCT_CATEGORY_MAP.get(_p["ths"], "其他") def product_category(ths: str) -> str: return PRODUCT_CATEGORY_MAP.get((ths or "").strip(), "其他") EXCHANGE_ORDER = ["上期所", "上期能源", "大商所", "郑商所", "中金所"] _MAIN_CACHE: dict[str, tuple[float, dict]] = {} _CACHE_TTL = 300 _main_index_lock = threading.Lock() _main_index: dict[str, dict] = {} _main_index_ts = 0.0 _index_refresh_lock = threading.Lock() def build_ths_code(product: dict, year: int, month: int) -> str: """同花顺软件内显示的合约代码。""" ex = product["ex"] letters = product["ths"] if ex == "CZCE": return f"{letters}{year % 10}{month:02d}" return f"{letters}{year % 100:02d}{month:02d}" def build_ths_full_code(product: dict, year: int, month: int) -> str: """同花顺 iFinD HTTP API 代码,如 ag2608.SHFE""" ths = build_ths_code(product, year, month) suffix = THS_EX_SUFFIX.get(product["ex"], product["ex"]) return f"{ths}.{suffix}" def build_sina_code(product: dict, year: int, month: int) -> str: letters = product["sina"] suffix = f"{year % 100:02d}{month:02d}" if product["ex"] == "CFFEX": return f"CFF_RE_{letters}{suffix}" return f"nf_{letters}{suffix}" def build_sina_main_code(product: dict) -> str: letters = product["sina"] if product["ex"] == "CFFEX": return f"CFF_RE_{letters}0" return f"nf_{letters}0" def _find_product_by_letters(letters: str) -> Optional[dict]: letters_up = letters.upper() for p in PRODUCTS: if p["ths"].upper() == letters_up or p["sina"] == letters_up: return p return None def _product_codes(product: dict, ths_code: str, market_code: str, sina_code: str) -> dict: return { "ths_code": ths_code, "market_code": market_code, "sina_code": sina_code, "ex": product["ex"], "name": product["name"], "exchange": product["exchange"], } def ths_to_codes(ths_code: str) -> Optional[dict]: """同花顺合约代码 -> ths_full + sina 回退代码。""" code = ths_code.strip() if not code: return None m4 = re.match(r"^([A-Za-z]+)(\d{4})$", code) if m4: letters, digits = m4.group(1), m4.group(2) year = 2000 + int(digits[:2]) month = int(digits[2:]) if not 1 <= month <= 12: return None product = _find_product_by_letters(letters) if product: ths = build_ths_code(product, year, month) return _product_codes( product, ths, build_ths_full_code(product, year, month), build_sina_code(product, year, month), ) letters_up = letters.upper() if letters_up in ("IF", "IH", "IC", "IM", "T", "TF", "TS"): ths = f"{letters_up}{digits}" return { "ths_code": ths, "market_code": f"{ths}.CFFEX", "sina_code": f"CFF_RE_{letters_up}{digits}", "ex": "CFFEX", "name": letters_up, "exchange": "中金所", } m3 = re.match(r"^([A-Za-z]+)(\d{3})$", code) if m3: letters, digits = m3.group(1), m3.group(2) y_digit = int(digits[0]) month = int(digits[1:]) if not 1 <= month <= 12: return None year = date.today().year decade = year // 10 * 10 candidate = decade + y_digit if candidate < year - 1: candidate += 10 product = _find_product_by_letters(letters) if product: ths = build_ths_code(product, candidate, month) return _product_codes( product, ths, build_ths_full_code(product, candidate, month), build_sina_code(product, candidate, month), ) return None def ths_to_sina_code(ths_code: str) -> Optional[str]: codes = ths_to_codes(ths_code) return codes["sina_code"] if codes else None def parse_contract_year_month(ths_code: str) -> Optional[tuple[int, int]]: """从同花顺合约代码解析交割年月。""" code = (ths_code or "").strip() if not code or "888" in code: return None m4 = re.match(r"^([A-Za-z]+)(\d{4})$", code) if m4: digits = m4.group(2) year = 2000 + int(digits[:2]) month = int(digits[2:]) if 1 <= month <= 12: return year, month m3 = re.match(r"^([A-Za-z]+)(\d{3})$", code) if m3: letters, digits = m3.group(1), m3.group(2) month = int(digits[1:]) if not 1 <= month <= 12: return None y_digit = int(digits[0]) year = date.today().year decade = year // 10 * 10 candidate = decade + y_digit if candidate < year - 1: candidate += 10 product = _find_product_by_letters(letters) if product: return candidate, month return None def is_near_expiry_main(ths_code: str) -> bool: """主力合约交割月为当月或下月时视为临期。""" ym = parse_contract_year_month(ths_code) if not ym: return False cy, cm = ym today = date.today() months_ahead = (cy - today.year) * 12 + (cm - today.month) return months_ahead <= 1 def _main_contract_score(raw: dict) -> float: """主力判定:优先持仓量,其次成交量。""" oi = float(raw.get("open_interest") or 0) vol = float(raw.get("volume") or 0) return oi if oi > 0 else vol def _make_symbol_item( product: dict, year: int, month: int, volume: float, open_interest: float = 0, ) -> dict: ths = build_ths_code(product, year, month) name = product["name"] return { "name": name, "ths_code": ths, "market_code": build_ths_full_code(product, year, month), "sina_code": build_sina_code(product, year, month), "exchange": product["exchange"], "contract": f"主力 {ths}", "display": f"{name} 主力 {ths}", "input_label": f"{name} {ths}", "volume": volume, "open_interest": open_interest, } def resolve_main_contract(product: dict) -> Optional[dict]: cache_key = product["sina"] now = time.time() cached = _MAIN_CACHE.get(cache_key) if cached and now - cached[0] < _CACHE_TTL: return cached[1] today = date.today() y, m = today.year, today.month best = None best_score = 0.0 for i in range(14): cy, cm = y, m + i while cm > 12: cm -= 12 cy += 1 sina = build_sina_code(product, cy, cm) raw = fetch_raw_for_volume(sina) if not raw: continue score = _main_contract_score(raw) if score <= 0: continue item = _make_symbol_item( product, cy, cm, raw["volume"], raw.get("open_interest", 0), ) if score > best_score: best_score = score best = item if best is None: sina_main = build_sina_main_code(product) raw = fetch_raw_for_volume(sina_main) if raw: ths_letters = product["ths"] ths_main = ( f"{ths_letters}888" if product["ex"] != "CFFEX" else f"{ths_letters.upper()}888" ) suffix = THS_EX_SUFFIX.get(product["ex"], product["ex"]) best = { "name": product["name"], "ths_code": ths_main, "market_code": f"{ths_main}.{suffix}", "sina_code": sina_main, "exchange": product["exchange"], "contract": f"主力连续 {ths_main}", "display": f"{product['name']} 主力连续 {ths_main}", "input_label": f"{product['name']} {ths_main}", "volume": raw.get("volume", 0), } if best: _MAIN_CACHE[cache_key] = (now, best) return best def _enrich_item(item: dict) -> dict: out = dict(item) if not out.get("input_label"): out["input_label"] = f"{out.get('name', '')} {out.get('ths_code', '')}".strip() out["near_expiry"] = is_near_expiry_main(out.get("ths_code", "")) return out def refresh_main_index(): """后台预热全部品种主力合约,搜索时只读本地缓存。""" global _main_index, _main_index_ts with _index_refresh_lock: new_idx: dict[str, dict] = {} with ThreadPoolExecutor(max_workers=10) as pool: futures = {pool.submit(resolve_main_contract, p): p for p in PRODUCTS} for fut in as_completed(futures): product = futures[fut] try: main = fut.result() if main: new_idx[product["sina"]] = _enrich_item(main) except Exception: pass with _main_index_lock: _main_index = new_idx _main_index_ts = time.time() def _warm_loop(): while True: try: refresh_main_index() except Exception: pass time.sleep(_CACHE_TTL) def _start_warm_thread(): threading.Thread(target=_warm_loop, daemon=True).start() def _stub_main_contract(product: dict) -> dict: """缓存未就绪时的快速占位(当月合约),避免首次打开搜索为空。""" today = date.today() return _enrich_item(_make_symbol_item(product, today.year, today.month, 0)) def _product_matches(product: dict, q_lower: str) -> bool: name_lower = product["name"].lower() if q_lower in name_lower: return True if len(q_lower) >= 2: ths_lower = product["ths"].lower() sina_lower = product["sina"].lower() if q_lower in ths_lower or q_lower in sina_lower: return True return False def _match_score(product: dict, q_lower: str) -> int: name_lower = product["name"].lower() if name_lower == q_lower: return 200 if name_lower.startswith(q_lower): return 150 if q_lower in name_lower: return 100 ths_lower = product["ths"].lower() if ths_lower == q_lower: return 90 if ths_lower.startswith(q_lower): return 70 if product["sina"].lower() == q_lower: return 80 return 10 def search_symbols(query: str) -> list: q = query.strip() if not q: return [] q_lower = q.lower() with _main_index_lock: index = dict(_main_index) index_ready = bool(index) scored: list[tuple[int, dict]] = [] for p in PRODUCTS: if not _product_matches(p, q_lower): continue main = index.get(p["sina"]) if not main and not index_ready: main = _stub_main_contract(p) if main: scored.append((_match_score(p, q_lower), main)) scored.sort(key=lambda x: -x[0]) results = [item for _, item in scored[:12]] if not results and len(q) >= 3: codes = ths_to_codes(q) if codes: raw = fetch_raw_for_volume(codes["sina_code"]) name = raw["name"] if raw else q results.append(_enrich_item({ "name": name, "ths_code": codes["ths_code"], "market_code": codes["market_code"], "sina_code": codes["sina_code"], "exchange": "", "contract": codes["ths_code"], "display": f"{name} ({codes['ths_code']})", "volume": raw.get("volume", 0) if raw else 0, })) return results _THS_TO_PRODUCT = {p["ths"]: p for p in PRODUCTS} for _p in PRODUCTS: _THS_TO_PRODUCT.setdefault(_p["ths"].lower(), _p) def _product_for_ths(ths: str) -> Optional[dict]: key = (ths or "").strip() if not key: return None return _THS_TO_PRODUCT.get(key) or _THS_TO_PRODUCT.get(key.lower()) def _product_for_contract_code(ths_code: str) -> Optional[dict]: sym = (ths_code or "").strip() if not sym: return None m = re.match(r"^([A-Za-z]+)", sym) if not m: return None return _find_product_by_letters(m.group(1)) def position_symbol_meta(ths_code: str) -> dict: """持仓/委托展示:品种名、交易所、是否主力合约。""" sym = (ths_code or "").strip() if not sym: return {"name": "", "exchange": "", "is_main": False} product = _product_for_contract_code(sym) if not product: return {"name": sym, "exchange": "", "is_main": False} codes = ths_to_codes(sym) norm = (codes["ths_code"] if codes else sym).strip().lower() is_main = False with _main_index_lock: main_item = _main_index.get(product["sina"]) if main_item: main_ths = (main_item.get("ths_code") or "").strip().lower() is_main = main_ths == norm or main_ths == sym.lower() return { "name": product["name"], "exchange": product.get("exchange") or "", "is_main": is_main, } def _item_from_recommend_row(row: dict, product: dict) -> Optional[dict]: """由可开仓缓存行快速构造下拉项(不在 HTTP 请求中解析主力)。""" name = row.get("name") or product["name"] main_code = (row.get("main_code") or "").strip() max_lots = row.get("max_lots") if main_code: codes = ths_to_codes(main_code) if codes: ths = codes["ths_code"] item = { "name": name, "ths_code": ths, "market_code": codes.get("market_code") or "", "sina_code": codes.get("sina_code") or "", "exchange": product["exchange"], "contract": f"主力 {ths}", "display": f"{name} 主力 {ths}", "input_label": f"{name} {ths}", } if max_lots is not None: item["max_lots"] = max_lots return _enrich_item(item) with _main_index_lock: main = _main_index.get(product["sina"]) if main: item = dict(main) if max_lots is not None: item["max_lots"] = max_lots return _enrich_item(item) item = _stub_main_contract(product) if max_lots is not None: item["max_lots"] = max_lots return item def list_recommended_symbols_grouped(recommend_rows: list[dict]) -> list[dict]: """按交易所分类返回可开仓品种对应的主力合约(品种选择下拉用)。""" if not recommend_rows: return [] buckets: dict[str, list] = defaultdict(list) seen: set[str] = set() for row in recommend_rows: if row.get("status") not in ("ok", "margin_ok"): continue ths_key = (row.get("ths") or "").strip() if not ths_key or ths_key in seen: continue product = _product_for_ths(ths_key) if not product: continue seen.add(ths_key) item = _item_from_recommend_row(row, product) if not item: continue buckets[product["exchange"]].append(item) groups: list[dict] = [] for cat in EXCHANGE_ORDER: items = buckets.get(cat) if items: groups.append({"category": cat, "items": items}) return groups def list_main_contracts_grouped() -> list[dict]: """按交易所分类返回全部品种主力合约(行情页下拉用)。""" with _main_index_lock: index = dict(_main_index) if len(index) < len(PRODUCTS) // 2: refresh_main_index() with _main_index_lock: index = dict(_main_index) buckets: dict[str, list] = defaultdict(list) for p in PRODUCTS: main = index.get(p["sina"]) if not main: resolved = resolve_main_contract(p) if resolved: main = _enrich_item(resolved) if main: buckets[p["exchange"]].append(main) groups: list[dict] = [] for cat in EXCHANGE_ORDER: items = buckets.get(cat) if items: groups.append({"category": cat, "items": items}) return groups _start_warm_thread() def get_price(market_code: str, sina_code: str = "") -> Optional[float]: return market_get_price(market_code, sina_code)