# Copyright (c) 2025-2026 马建军. All rights reserved. # 专有软件 — 未经授权禁止复制、传播、转售。 # 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。 # 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md """按账户资金筛选可开仓品种(保证金与仓位纪律)。""" from __future__ import annotations import logging import math from concurrent.futures import ThreadPoolExecutor from typing import Callable, Optional from modules.core.contract_specs import get_contract_spec, margin_one_lot from modules.fees.fee_specs import calc_fee_breakdown from modules.trading.recommend_trend import analyze_product_daily, sort_recommend_by_trend from modules.core.symbols import PRODUCTS, product_category, product_has_night_session logger = logging.getLogger(__name__) # 权益不超过该值时,仅允许下列品种(可开仓列表、品种下拉、开仓报单) SMALL_ACCOUNT_CAPITAL_MAX = 200_000.0 # 未连接 CTP 时,可开仓品种表按该权益估算最大手数(与参考资金设置无关) DISCONNECTED_RECOMMEND_CAPITAL = 100_000.0 SMALL_ACCOUNT_PRODUCT_THS = frozenset({"c", "m", "MA", "rb"}) SMALL_ACCOUNT_SCOPE_LABEL = "玉米、豆粕、甲醇、螺纹钢" SMALL_ACCOUNT_RECOMMENDED_OPEN_MARGIN_PCT = 30.0 SMALL_ACCOUNT_RECOMMENDED_ROLL_MARGIN_PCT = 40.0 def small_account_margin_recommendations() -> dict: """20 万以下账户建议的保证金比例(供系统设置参考)。""" wan = int(SMALL_ACCOUNT_CAPITAL_MAX // 10_000) return { "open_margin_pct": SMALL_ACCOUNT_RECOMMENDED_OPEN_MARGIN_PCT, "roll_margin_pct": SMALL_ACCOUNT_RECOMMENDED_ROLL_MARGIN_PCT, "label": ( f"权益 {wan} 万以下建议:开仓保证金上限 " f"{int(SMALL_ACCOUNT_RECOMMENDED_OPEN_MARGIN_PCT)}%," f"滚仓总保证金不超过 {int(SMALL_ACCOUNT_RECOMMENDED_ROLL_MARGIN_PCT)}%" ), } def small_account_scope_hint(*, ctp_connected: bool = True) -> str: wan = int(SMALL_ACCOUNT_CAPITAL_MAX // 10_000) if not ctp_connected: rec_wan = int(DISCONNECTED_RECOMMEND_CAPITAL // 10_000) return ( f"未连接 CTP,按 {rec_wan} 万权益估算最大手数," f"仅显示并可交易 {SMALL_ACCOUNT_SCOPE_LABEL}" ) return f"权益 {wan} 万以下仅显示并可交易:{SMALL_ACCOUNT_SCOPE_LABEL}" def small_account_scope_status_label() -> str: wan = int(SMALL_ACCOUNT_CAPITAL_MAX // 10_000) return f"权益{wan}万以下限{SMALL_ACCOUNT_SCOPE_LABEL}" def should_apply_small_account_scope( capital: float, *, ctp_connected: bool, ) -> bool: """SimNow/实盘一致:未连接 CTP 时默认按 20 万以下四品种范围。""" if not ctp_connected: return True return is_small_account(capital) def filter_rows_for_account_scope( rows: list[dict], capital: float, *, ctp_connected: bool, ) -> list[dict]: if not should_apply_small_account_scope(capital, ctp_connected=ctp_connected): return rows return [r for r in rows if product_in_small_account_whitelist(r.get("ths") or "")] def normalize_product_ths(ths: str) -> str: import re s = (ths or "").strip() m = re.match(r"^([A-Za-z]+)", s) return m.group(1) if m else s def is_small_account(capital: float) -> bool: cap = float(capital or 0) return 0 < cap <= SMALL_ACCOUNT_CAPITAL_MAX def product_in_small_account_whitelist(ths_or_product) -> bool: if isinstance(ths_or_product, dict): key = (ths_or_product.get("ths") or "").strip() else: key = normalize_product_ths(str(ths_or_product or "")) if not key: return False root = normalize_product_ths(key) if root in SMALL_ACCOUNT_PRODUCT_THS: return True upper = root.upper() return upper in {x.upper() for x in SMALL_ACCOUNT_PRODUCT_THS} def assert_product_allowed_for_capital( ths: str, capital: float, *, ctp_connected: bool = True, ) -> Optional[str]: """小账户品种白名单校验;通过返回 None。""" if not should_apply_small_account_scope(capital, ctp_connected=ctp_connected): return None if product_in_small_account_whitelist(ths): return None wan = int(SMALL_ACCOUNT_CAPITAL_MAX // 10_000) if not ctp_connected: return f"未连接 CTP,仅可交易:{SMALL_ACCOUNT_SCOPE_LABEL}" return f"权益 {wan} 万以下仅可交易:{SMALL_ACCOUNT_SCOPE_LABEL}" def filter_products_for_capital( products: list[dict], capital: float, *, ctp_connected: bool = True, ) -> list[dict]: if not should_apply_small_account_scope(capital, ctp_connected=ctp_connected): return list(products) return [p for p in products if product_in_small_account_whitelist(p)] def _attach_turnover(row: dict) -> None: """成交额 = 昨日成交量(手) × 昨收 × 合约乘数。""" try: vol = float(row.get("volume") or 0) price = float(row.get("prev_close") or row.get("price") or 0) mult = float(row.get("mult") or 0) except (TypeError, ValueError): return if vol > 0 and price > 0 and mult > 0: row["turnover"] = round(vol * price * mult, 2) def _letters_from_ths(ths_code: str) -> str: import re m = re.match(r"^([A-Za-z]+)", (ths_code or "").strip()) return m.group(1) if m else "" def assess_product_for_capital( product: dict, capital: float, price: Optional[float], *, max_margin_pct: float = 30.0, default_stop_ticks: int = 20, reward_risk_ratio: float = 2.0, trading_mode: str = "simulation", ctp_connected: bool = True, main_code: str = "", margin_used: float = 0.0, ) -> dict: """评估单品种在当前资金下是否可交易。""" ths = product.get("ths") or "" name = product.get("name") or ths exchange = product.get("exchange") or "" category = product.get("category") or product_category(ths) spec = get_contract_spec(ths + "8888") mult = spec["mult"] margin_rate = spec["margin_rate"] tick = float(spec.get("tick_size") or 1.0) p = float(price) if price and price > 0 else 0.0 cap = float(capital or 0) margin_pct = max(1.0, min(100.0, float(max_margin_pct or 30.0))) if should_apply_small_account_scope(cap, ctp_connected=ctp_connected) and not product_in_small_account_whitelist(product): return { "ths": ths, "name": name, "exchange": exchange, "category": category, "mult": spec["mult"], "tick_size": tick, "status": "blocked", "status_label": small_account_scope_status_label(), "min_capital_one_lot": None, "margin_one_lot": None, "max_lots": 0, "risk_one_lot_1pct": None, "has_night_session": product_has_night_session(product), } if p <= 0: return { "ths": ths, "name": name, "exchange": exchange, "category": category, "mult": mult, "tick_size": tick, "status": "no_price", "status_label": "暂无行情", "min_capital_one_lot": None, "margin_one_lot": None, "max_lots": 0, "risk_one_lot_1pct": None, "has_night_session": product_has_night_session(product), } margin_source = None code_for_margin = (main_code or "").strip() or (ths + "8888") if p > 0 and ctp_connected: margin_one, margin_source, spec_used = margin_one_lot( code_for_margin, p, direction="max", trading_mode=trading_mode, ) if spec_used.get("mult"): mult = spec_used["mult"] if spec_used.get("tick_size"): tick = float(spec_used["tick_size"]) else: margin_one = p * mult * margin_rate min_capital = margin_one / (margin_pct / 100.0) if margin_pct > 0 else margin_one margin_budget = cap * margin_pct / 100.0 if cap > 0 else 0.0 margin_budget = max(0.0, margin_budget - max(0.0, float(margin_used or 0))) max_lots = int(math.floor(margin_budget / margin_one)) if margin_one > 0 and margin_budget > 0 else 0 stop_dist = tick * default_stop_ticks risk_one_lot = stop_dist * mult risk_pct_1lot = (risk_one_lot / cap * 100) if cap > 0 else 999.0 ref_sl = round(p - stop_dist, 4) ref_tp = round(p + stop_dist * reward_risk_ratio, 4) fee_ths = ths + "8888" try: fee_info = calc_fee_breakdown( fee_ths, p, p, 1.0, open_time="", close_time="", trading_mode=trading_mode, ) except Exception as exc: logger.debug("recommend fee calc failed %s: %s", ths, exc) fee_info = {"open_fee": 0.0, "total_fee": 0.0} can_margin = max_lots >= 1 can_risk = cap > 0 and risk_one_lot <= cap * 0.01 if can_margin and can_risk: status, label = "ok", f"最大 {max_lots} 手" elif can_margin: status, label = "margin_ok", f"最大 {max_lots} 手·止损偏宽" else: status, label = "blocked", "资金不足" if margin_source == "ctp" and can_margin: label += "(柜台保证金)" row_out = { "ths": ths, "name": name, "exchange": exchange, "category": category, "price": round(p, 4), "mult": mult, "tick_size": tick, "margin_one_lot": round(margin_one, 2), "min_capital_one_lot": round(min_capital, 2), "max_lots": max_lots, "margin_budget": round(margin_budget, 2), "max_margin_pct": margin_pct, "risk_one_lot_1pct": round(risk_one_lot, 2), "risk_pct_1lot_at_1pct_rule": round(risk_pct_1lot, 2), "ref_stop_loss": ref_sl, "ref_take_profit": ref_tp, "open_fee_one_lot": fee_info["open_fee"], "roundtrip_fee_one_lot": fee_info["total_fee"], "status": status, "status_label": label, "has_night_session": product_has_night_session(product), } if margin_source: row_out["margin_source"] = margin_source return row_out def list_product_recommendations( capital: float, quote_fn: Callable[[str], Optional[dict]], *, max_margin_pct: float = 30.0, trading_mode: str = "simulation", ctp_connected: bool = True, margin_used: float = 0.0, ) -> list[dict]: """扫描全部品种并排序:可开且纪律友好 > 可开 > 不足。quote_fn(品种代码) -> {price, ths_code, ...}""" def _one(product: dict) -> dict: ths = product["ths"] try: quote = quote_fn(ths) or {} price = quote.get("price") main_code = (quote.get("ths_code") or "").strip() row = assess_product_for_capital( product, capital, price, max_margin_pct=max_margin_pct, trading_mode=trading_mode, ctp_connected=ctp_connected, main_code=main_code, margin_used=margin_used, ) row["main_code"] = main_code if main_code: row.update(analyze_product_daily(main_code)) _attach_turnover(row) return row except Exception as exc: logger.warning("recommend product failed %s: %s", ths, exc) spec = get_contract_spec(ths + "8888") return { "ths": ths, "name": product.get("name") or ths, "exchange": product.get("exchange") or "", "category": product.get("category") or product_category(ths), "mult": spec["mult"], "tick_size": float(spec.get("tick_size") or 1.0), "status": "no_price", "status_label": "计算失败", "main_code": "", "max_lots": 0, "has_night_session": product_has_night_session(product), } with ThreadPoolExecutor(max_workers=10) as pool: products = filter_products_for_capital(PRODUCTS, capital) rows = list(pool.map(_one, products)) return sort_recommend_by_trend(rows)