"""从 CTP 柜台同步手续费率(SimNow / 期货公司)。""" from __future__ import annotations import logging import re import time from typing import Optional from contract_specs import get_contract_spec from fee_specs import upsert_fee_rate from vnpy_bridge import get_bridge logger = logging.getLogger(__name__) def _product_from_instrument(instrument_id: str) -> str: m = re.match(r"^([A-Za-z]+)", instrument_id or "") return m.group(1).lower() if m else "" def ctp_commission_to_fee_fields(data: dict, ths_code: str) -> dict: """CTP OnRspQryInstrumentCommissionRate → fee_rates 字段。""" mult = int(get_contract_spec(ths_code)["mult"]) exchange = str(data.get("ExchangeID") or "").strip() return { "exchange": exchange, "mult": mult, "open_fixed": float(data.get("OpenRatioByVolume") or 0), "open_ratio": float(data.get("OpenRatioByMoney") or 0), "close_yesterday_fixed": float(data.get("CloseRatioByVolume") or 0), "close_yesterday_ratio": float(data.get("CloseRatioByMoney") or 0), "close_today_fixed": float(data.get("CloseTodayRatioByVolume") or 0), "close_today_ratio": float(data.get("CloseTodayRatioByMoney") or 0), "source": "ctp", } def _collect_main_ths_codes() -> list[str]: """从主力列表收集同花顺合约代码(供 CTP 手续费查询)。""" from datetime import date from symbols import PRODUCTS, build_ths_code, list_main_contracts_grouped symbols: list[str] = [] for group in list_main_contracts_grouped(): for item in group.get("items") or []: ths = (item.get("ths_code") or item.get("ths") or item.get("code") or "").strip() if ths and not ths.endswith("888"): symbols.append(ths) if symbols: return symbols today = date.today() for p in PRODUCTS: symbols.append(build_ths_code(p, today.year, today.month)) return symbols def sync_fees_from_ctp(mode: str, *, max_symbols: int = 80) -> tuple[int, str]: """CTP 已连接时查询手续费并写入 fee_rates(source=ctp,覆盖同品种旧数据)。""" bridge = get_bridge() if not bridge.available(): return 0, "vnpy 未安装" if bridge.connected_mode != mode: return 0, "请先连接 CTP" if not bridge.ping(): return 0, "CTP 连接无效,请重连" seen: set[str] = set() ok = 0 errors = 0 batch = bridge.query_all_commissions(mode=mode) if batch: for raw in batch: inst = str(raw.get("InstrumentID") or "").strip() product = _product_from_instrument(inst) if not product or product in seen: continue seen.add(product) try: fields = ctp_commission_to_fee_fields(raw, inst or product) upsert_fee_rate(product, fields) ok += 1 except Exception as exc: logger.debug("CTP fee batch %s: %s", inst, exc) errors += 1 if ok > 0: msg = f"已从 CTP 批量同步 {ok} 个品种手续费" if errors: msg += f"({errors} 个跳过)" return ok, msg symbols = _collect_main_ths_codes()[:max_symbols] if not symbols: return 0, "无主力合约列表" for ths in symbols: product = _product_from_instrument(ths) if not product or product in seen: continue seen.add(product) try: raw = bridge.query_instrument_commission(ths, mode=mode) if not raw: errors += 1 continue fields = ctp_commission_to_fee_fields(raw, ths) upsert_fee_rate(product, fields) ok += 1 time.sleep(0.35) except Exception as exc: logger.debug("CTP fee sync %s: %s", ths, exc) errors += 1 if ok == 0: return 0, f"CTP 未返回手续费率(失败 {errors} 次),请确认柜台支持查询" msg = f"已从 CTP 同步 {ok} 个品种手续费" if errors: msg += f"({errors} 个跳过)" return ok, msg def sync_fee_for_symbol(mode: str, ths_code: str) -> Optional[dict]: """单品种按需从 CTP 拉取并缓存。""" bridge = get_bridge() if bridge.connected_mode != mode or not bridge.ping(): return None raw = bridge.query_instrument_commission(ths_code, mode=mode) if not raw: return None product = _product_from_instrument(ths_code) if not product: return None fields = ctp_commission_to_fee_fields(raw, ths_code) upsert_fee_rate(product, fields) return fields