"""从 CTP 柜台同步手续费率(SimNow / 期货公司)。""" from __future__ import annotations import logging import re import time from typing import Optional from contract_specs import get_contract_spec from fee_specs import upsert_fee_rate from vnpy_bridge import get_bridge logger = logging.getLogger(__name__) def _product_from_instrument(instrument_id: str) -> str: m = re.match(r"^([A-Za-z]+)", instrument_id or "") return m.group(1).lower() if m else "" def ctp_commission_to_fee_fields(data: dict, ths_code: str) -> dict: """CTP OnRspQryInstrumentCommissionRate → fee_rates 字段。""" mult = int(get_contract_spec(ths_code)["mult"]) exchange = str(data.get("ExchangeID") or "").strip() return { "exchange": exchange, "mult": mult, "open_fixed": float(data.get("OpenRatioByVolume") or 0), "open_ratio": float(data.get("OpenRatioByMoney") or 0), "close_yesterday_fixed": float(data.get("CloseRatioByVolume") or 0), "close_yesterday_ratio": float(data.get("CloseRatioByMoney") or 0), "close_today_fixed": float(data.get("CloseTodayRatioByVolume") or 0), "close_today_ratio": float(data.get("CloseTodayRatioByMoney") or 0), "source": "ctp", } def sync_fees_from_ctp(mode: str, *, max_symbols: int = 80) -> tuple[int, str]: """CTP 已连接时,按主力合约查询手续费并写入 fee_rates(source=ctp)。""" bridge = get_bridge() if not bridge.available(): return 0, "vnpy 未安装" if bridge.connected_mode != mode: return 0, "请先连接 CTP" if not bridge.ping(): return 0, "CTP 连接无效,请重连" from symbols import list_main_contracts_grouped mains = list_main_contracts_grouped() symbols: list[str] = [] for g in mains: ths = (g.get("ths") or g.get("code") or "").strip() if ths: symbols.append(ths) symbols = symbols[:max_symbols] if not symbols: return 0, "无主力合约列表" seen: set[str] = set() ok = 0 errors = 0 for ths in symbols: product = _product_from_instrument(ths) if not product or product in seen: continue seen.add(product) try: raw = bridge.query_instrument_commission(ths, mode=mode) if not raw: errors += 1 continue fields = ctp_commission_to_fee_fields(raw, ths) upsert_fee_rate(product, fields) ok += 1 time.sleep(0.35) except Exception as exc: logger.debug("CTP fee sync %s: %s", ths, exc) errors += 1 if ok == 0: return 0, f"CTP 未返回手续费率(失败 {errors} 次),请确认柜台支持查询" msg = f"已从 CTP 同步 {ok} 个品种手续费" if errors: msg += f"({errors} 个跳过)" return ok, msg def sync_fee_for_symbol(mode: str, ths_code: str) -> Optional[dict]: """单品种按需从 CTP 拉取并缓存。""" bridge = get_bridge() if bridge.connected_mode != mode or not bridge.ping(): return None raw = bridge.query_instrument_commission(ths_code, mode=mode) if not raw: return None product = _product_from_instrument(ths_code) if not product: return None fields = ctp_commission_to_fee_fields(raw, ths_code) upsert_fee_rate(product, fields) return fields