diff --git a/app.py b/app.py index 3f5420e..f884b47 100644 --- a/app.py +++ b/app.py @@ -29,15 +29,10 @@ from contract_specs import calc_position_metrics from fee_specs import ( calc_fee_breakdown, calc_round_trip_fee, - get_fee_multiplier, - get_fee_source_mode, - list_all_fee_rates, list_fee_rates_for_ui, count_fee_rates_by_source, - load_fee_rates_from_json, - upsert_fee_rate, + purge_non_ctp_fee_rates, ) -from fee_sync import sync_fees_from_akshare from nav_settings import NAV_TOGGLES, get_nav_items, nav_enabled, save_nav_items from contract_profile import get_contract_profile from stats_engine import STATS_VIEWS, load_stats_cache, refresh_stats_cache @@ -376,11 +371,11 @@ def init_db(): set_setting("risk_percent", "1") if not get_setting("fee_source_mode"): set_setting("fee_source_mode", "ctp") - conn = get_db() - fee_cnt = conn.execute("SELECT COUNT(*) FROM fee_rates").fetchone()[0] - conn.close() - if fee_cnt == 0: - load_fee_rates_from_json() + set_setting("fee_source_mode", "ctp") + try: + purge_non_ctp_fee_rates() + except Exception: + pass def sync_admin_from_env(): @@ -1565,66 +1560,32 @@ def api_contract_profile(): @require_nav("fees") def fees(): from trading_context import get_trading_mode - from ctp_fee_sync import sync_fees_from_ctp + from ctp_fee_worker import try_daily_ctp_fee_sync, get_fee_last_sync, fees_synced_today from vnpy_bridge import ctp_status mode = get_trading_mode(get_setting) if request.method == "POST": action = request.form.get("action") - if action == "fee_source": - fs = request.form.get("fee_source_mode", "ctp").strip() - set_setting("fee_source_mode", fs if fs in ("ctp", "local") else "ctp") - flash("手续费数据源已保存") - elif action == "sync_ctp": - count, msg = sync_fees_from_ctp(mode) + if action == "sync_ctp": + force = request.form.get("force") == "1" + count, msg = try_daily_ctp_fee_sync( + mode, + get_setting=get_setting, + set_setting=set_setting, + force=force, + ) flash(msg) - elif action == "multiplier": - try: - mult = float(request.form.get("fee_multiplier", "2")) - if mult < 0: - flash("倍率不能为负数") - else: - set_setting("fee_multiplier", str(mult)) - flash(f"手续费倍率已保存:标准 × {mult}") - except ValueError: - flash("请输入有效倍率") - elif action == "sync": - mult = float(get_setting("fee_multiplier", "2") or 2) - count, msg = sync_fees_from_akshare(mult) - flash(msg if count else msg) - elif action == "reload_json": - n = load_fee_rates_from_json() - flash(f"已从本地 JSON 加载 {n} 个品种费率") - elif action == "save_row": - product = request.form.get("product", "").strip().lower() - if not product: - flash("品种代码不能为空") - else: - upsert_fee_rate(product, { - "exchange": request.form.get("exchange", "").strip(), - "mult": int(request.form.get("mult") or 10), - "open_fixed": float(request.form.get("open_fixed") or 0), - "open_ratio": float(request.form.get("open_ratio") or 0), - "close_yesterday_fixed": float(request.form.get("close_yesterday_fixed") or 0), - "close_yesterday_ratio": float(request.form.get("close_yesterday_ratio") or 0), - "close_today_fixed": float(request.form.get("close_today_fixed") or 0), - "close_today_ratio": float(request.form.get("close_today_ratio") or 0), - "source": "manual", - }) - flash(f"已保存 {product} 费率") return redirect(url_for("fees")) rates = list_fee_rates_for_ui() fee_counts = count_fee_rates_by_source() - multiplier = get_setting("fee_multiplier", "2") - fee_source_mode = get_fee_source_mode() ctp_st = ctp_status(mode) return render_template( "fees.html", rates=rates, fee_counts=fee_counts, - multiplier=multiplier, - fee_source_mode=fee_source_mode, + fee_last_sync=get_fee_last_sync(get_setting), + fee_synced_today=fees_synced_today(get_setting), ctp_connected=bool(ctp_st.get("connected")), ) diff --git a/ctp_fee_sync.py b/ctp_fee_sync.py index 0737744..394d3ab 100644 --- a/ctp_fee_sync.py +++ b/ctp_fee_sync.py @@ -58,7 +58,7 @@ def _collect_main_ths_codes() -> list[str]: def sync_fees_from_ctp(mode: str, *, max_symbols: int = 80) -> tuple[int, str]: - """CTP 已连接时,按主力合约查询手续费并写入 fee_rates(source=ctp)。""" + """CTP 已连接时查询手续费并写入 fee_rates(source=ctp,覆盖同品种旧数据)。""" bridge = get_bridge() if not bridge.available(): return 0, "vnpy 未安装" diff --git a/ctp_fee_worker.py b/ctp_fee_worker.py new file mode 100644 index 0000000..1ff36f1 --- /dev/null +++ b/ctp_fee_worker.py @@ -0,0 +1,90 @@ +"""CTP 手续费后台同步:每日一次写入数据库,前端只读展示。""" +from __future__ import annotations + +import logging +import threading +import time +from datetime import date, datetime +from typing import Callable, Optional +from zoneinfo import ZoneInfo + +logger = logging.getLogger(__name__) + +TZ = ZoneInfo("Asia/Shanghai") +FEE_SYNC_KEY = "ctp_fee_last_sync" +CHECK_INTERVAL_SEC = 3600 +_sync_lock = threading.Lock() + + +def _today_str() -> str: + return datetime.now(TZ).date().isoformat() + + +def get_fee_last_sync(get_setting: Callable[[str, str], str]) -> str: + return (get_setting(FEE_SYNC_KEY, "") or "").strip() + + +def fees_synced_today(get_setting: Callable[[str, str], str]) -> bool: + last = get_fee_last_sync(get_setting) + return bool(last) and last[:10] == _today_str() + + +def mark_fees_synced(set_setting: Callable[[str, str], None]) -> None: + set_setting(FEE_SYNC_KEY, datetime.now(TZ).isoformat(timespec="seconds")) + + +def try_daily_ctp_fee_sync( + mode: str, + *, + get_setting: Callable[[str, str], str], + set_setting: Callable[[str, str], None], + force: bool = False, +) -> tuple[int, str]: + """CTP 已连接且今日未同步时拉取费率入库;force=True 忽略日期限制。""" + if not force and fees_synced_today(get_setting): + return 0, "今日已从 CTP 同步过,无需重复(可点「立即同步」强制刷新)" + + with _sync_lock: + if not force and fees_synced_today(get_setting): + return 0, "今日已从 CTP 同步过" + + from ctp_fee_sync import sync_fees_from_ctp + + count, msg = sync_fees_from_ctp(mode) + if count > 0: + mark_fees_synced(set_setting) + logger.info("CTP 手续费每日同步: %s", msg) + elif force: + logger.warning("CTP 手续费强制同步未写入: %s", msg) + return count, msg + + +def start_ctp_fee_worker( + *, + get_mode_fn: Callable[[], str], + get_setting_fn: Callable[[str, str], str], + set_setting_fn: Callable[[str, str], None], + interval: int = CHECK_INTERVAL_SEC, +) -> None: + """后台线程:每小时检查,CTP 已连接且当日未同步则自动同步。""" + + def _loop() -> None: + time.sleep(20) + while True: + try: + from vnpy_bridge import ctp_status + + mode = get_mode_fn() + st = ctp_status(mode) + if st.get("connected") and not fees_synced_today(get_setting_fn): + try_daily_ctp_fee_sync( + mode, + get_setting=get_setting_fn, + set_setting=set_setting_fn, + force=False, + ) + except Exception as exc: + logger.warning("CTP fee worker: %s", exc) + time.sleep(max(300, interval)) + + threading.Thread(target=_loop, daemon=True, name="ctp-fee-worker").start() diff --git a/fee_specs.py b/fee_specs.py index 56f0ae1..a67db0c 100644 --- a/fee_specs.py +++ b/fee_specs.py @@ -1,4 +1,4 @@ -"""期货手续费:优先 CTP 柜台费率,本地/AKShare 为离线兜底。""" +"""期货手续费:仅 CTP 柜台同步入库,前端只读展示。""" import json import os import re @@ -37,6 +37,26 @@ def _get_db(): return connect_db() +def get_setting(key: str, default: str = "") -> str: + conn = _get_db() + row = conn.execute("SELECT value FROM settings WHERE key=?", (key,)).fetchone() + conn.close() + if not row: + return default + return (row["value"] or default) if row["value"] is not None else default + + +def set_setting(key: str, value: str) -> None: + conn = _get_db() + conn.execute( + """INSERT INTO settings (key, value) VALUES (?,?) + ON CONFLICT(key) DO UPDATE SET value=excluded.value""", + (key, value), + ) + conn.commit() + conn.close() + + def get_fee_multiplier() -> float: conn = _get_db() row = conn.execute( @@ -52,14 +72,20 @@ def get_fee_multiplier() -> float: def get_fee_source_mode() -> str: - """ctp=优先柜台同步费率;local=本地/AKShare 表。""" + """固定 CTP 柜台。""" + return "ctp" + + +def purge_non_ctp_fee_rates() -> int: + """删除非 CTP 来源的费率缓存。""" conn = _get_db() - row = conn.execute( - "SELECT value FROM settings WHERE key='fee_source_mode'" - ).fetchone() + cur = conn.execute( + "DELETE FROM fee_rates WHERE COALESCE(source, '') != 'ctp'" + ) + n = cur.rowcount + conn.commit() conn.close() - mode = (row["value"] if row else "ctp") or "ctp" - return mode if mode in ("ctp", "local") else "ctp" + return n def _row_to_spec(row, mult: int) -> dict: @@ -84,46 +110,21 @@ def get_fee_spec(ths_code: str, *, trading_mode: str = "simulation") -> dict: return {**DEFAULT_FEE, "mult": spec["mult"], "product": "", "exchange": "", "source": "default"} mult = get_contract_spec(ths_code)["mult"] - source_mode = get_fee_source_mode() conn = _get_db() - - if source_mode == "ctp": - row = conn.execute( - "SELECT * FROM fee_rates WHERE product=? AND source='ctp'", - (product,), - ).fetchone() - if not row: - row = conn.execute( - "SELECT * FROM fee_rates WHERE product=? ORDER BY CASE source WHEN 'ctp' THEN 0 ELSE 1 END", - (product,), - ).fetchone() - conn.close() - if row: - return _row_to_spec(row, mult) - # 按需向 CTP 查询 - try: - from ctp_fee_sync import sync_fee_for_symbol - fields = sync_fee_for_symbol(trading_mode, ths_code) - if fields: - return {"product": product, **fields} - except Exception: - pass - conn = _get_db() - row = conn.execute( - "SELECT * FROM fee_rates WHERE product=?", (product,) - ).fetchone() - conn.close() - if row: - spec = _row_to_spec(row, mult) - spec["source"] = spec.get("source") or "local_fallback" - return spec - else: - row = conn.execute( - "SELECT * FROM fee_rates WHERE product=?", (product,) - ).fetchone() - conn.close() - if row: - return _row_to_spec(row, mult) + row = conn.execute( + "SELECT * FROM fee_rates WHERE product=? AND source='ctp'", + (product,), + ).fetchone() + conn.close() + if row: + return _row_to_spec(row, mult) + try: + from ctp_fee_sync import sync_fee_for_symbol + fields = sync_fee_for_symbol(trading_mode, ths_code) + if fields: + return {"product": product, **fields} + except Exception: + pass if product in _INDEX_PRODUCTS: return { @@ -287,6 +288,16 @@ def load_fee_rates_from_json(path: Optional[str] = None) -> int: return count +def list_ctp_fee_rates() -> list: + """手续费页:仅展示 CTP 同步结果。""" + conn = _get_db() + rows = conn.execute( + "SELECT * FROM fee_rates WHERE source='ctp' ORDER BY product" + ).fetchall() + conn.close() + return [dict(r) for r in rows] + + def list_all_fee_rates() -> list: conn = _get_db() rows = conn.execute( @@ -297,28 +308,16 @@ def list_all_fee_rates() -> list: def list_fee_rates_for_ui() -> list: - """手续费页展示:CTP 模式下 ctp 来源优先排前。""" - rows = list_all_fee_rates() - if get_fee_source_mode() == "ctp": - rows.sort( - key=lambda r: ( - 0 if (r.get("source") or "") == "ctp" else 1, - r.get("product") or "", - ) - ) - return rows + return list_ctp_fee_rates() def count_fee_rates_by_source() -> dict[str, int]: conn = _get_db() - rows = conn.execute( - "SELECT source, COUNT(*) AS n FROM fee_rates GROUP BY source" - ).fetchall() + n = conn.execute( + "SELECT COUNT(*) FROM fee_rates WHERE source='ctp'" + ).fetchone()[0] conn.close() - out: dict[str, int] = {} - for row in rows: - out[str(row["source"] or "local")] = int(row["n"] or 0) - return out + return {"ctp": int(n or 0)} def upsert_fee_rate(product: str, fields: dict) -> None: diff --git a/install_trading.py b/install_trading.py index a3d7738..4d5be0a 100644 --- a/install_trading.py +++ b/install_trading.py @@ -20,6 +20,7 @@ from position_sizing import ( from recommend_store import load_recommend_cache, refresh_recommend_cache from recommend_stream import recommend_hub, start_recommend_worker from ctp_reconnect import start_ctp_reconnect_worker +from ctp_fee_worker import start_ctp_fee_worker from risk.account_risk_lib import ( assert_can_open, get_risk_status, @@ -1016,3 +1017,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se init_tables_fn=_init_tables, ) start_ctp_reconnect_worker(get_mode_fn=lambda: get_trading_mode(get_setting)) + start_ctp_fee_worker( + get_mode_fn=lambda: get_trading_mode(get_setting), + get_setting_fn=get_setting, + set_setting_fn=set_setting, + ) diff --git a/templates/fees.html b/templates/fees.html index 87b2d24..f9db460 100644 --- a/templates/fees.html +++ b/templates/fees.html @@ -2,82 +2,46 @@ {% block title %}手续费配置 - 国内期货监控系统{% endblock %} {% block extra_css %} {% endblock %} {% block content %} -
-
-

手续费数据源

-
-
- -
- - -
- -
-

- 默认使用 CTP 柜台 费率(连接后自动同步,与 SimNow/期货公司一致)。 -

-
-
- - -
- {% if ctp_connected %} - CTP 已连接 - {% else %} - CTP 未连接 - {% endif %} -
- {% if fee_counts %} -

- 已缓存: - {% if fee_counts.get('ctp') %}CTP {{ fee_counts.ctp }}{% endif %} - {% if fee_counts.get('local') %}local {{ fee_counts.local }}{% endif %} - {% if fee_counts.get('json') %}json {{ fee_counts.json }}{% endif %} - {% if fee_counts.get('manual') %}manual {{ fee_counts.manual }}{% endif %} -

- {% endif %} -
-
- -
-

本地参考倍率

-
-

仅「本地数据源」时使用

-
- - - - -
-
-
- - -
-
- - -
-
-
+
+

CTP 手续费

+
+

+ 费率由后台从 CTP 柜台 同步写入数据库,每日自动更新一次,本页只读展示。 +

+ {% if ctp_connected %} + CTP 已连接 + {% else %} + CTP 未连接 + {% endif %} + {% if fee_synced_today %} + 今日已同步 + {% else %} + 今日未同步 + {% endif %} + {% if fee_last_sync %} + 上次:{{ fee_last_sync[:16] }} + {% endif %} + {% if fee_counts.get('ctp') %} + 共 {{ fee_counts.ctp }} 个品种 + {% endif %} +
+ + + +
@@ -88,47 +52,38 @@ - + - + {% for r in rates %} - {% set fid = 'fee-row-' ~ r.product %} - - - - - - - - - + + + + + + + + - {% else %} - + {% endfor %}
品种来源交易所乘数品种交易所乘数 开仓(元/手)开仓(比例) 平昨(元/手)平昨(比例) 平今(元/手)平今(比例)更新操作更新
{{ r.product }}{{ r.source or 'local' }}{{ r.exchange or '—' }}{{ r.mult }}{{ r.open_fixed }}{{ r.open_ratio }}{{ r.close_yesterday_fixed }}{{ r.close_yesterday_ratio }}{{ r.close_today_fixed }}{{ r.close_today_ratio }} {{ (r.updated_at or '')[:16] }}
暂无费率,请连接 CTP 后同步
暂无 CTP 费率,请连接 CTP 后等待自动同步或点击「立即同步」
- {% for r in rates %} - - {% endfor %}

公式:单边 = 固定(元/手)×手数 + 比例×价格×乘数×手数;往返 = 开仓 + 平仓(平今/平昨自动判断)。 - {% if fee_source_mode == 'ctp' and ctp_connected and not fee_counts.get('ctp') %} -
当前无 CTP 费率缓存,请点击「从 CTP 同步费率」。 + {% if ctp_connected and not fee_counts.get('ctp') %} +
数据库尚无 CTP 费率,请点击「立即同步」或等待后台每日任务。 {% endif %}

diff --git a/vnpy_bridge.py b/vnpy_bridge.py index 9181219..627acd6 100644 --- a/vnpy_bridge.py +++ b/vnpy_bridge.py @@ -223,15 +223,30 @@ class CtpBridge: self._connected_mode = None def _schedule_fee_sync(self, mode: str) -> None: + """连接成功后触发每日同步检查(非每次全量)。""" + def _run() -> None: try: - from ctp_fee_sync import sync_fees_from_ctp - n, msg = sync_fees_from_ctp(mode, max_symbols=60) - logger.info("CTP 手续费同步: %s", msg if n else msg) - except Exception as exc: - logger.debug("CTP 手续费后台同步: %s", exc) + from ctp_fee_worker import try_daily_ctp_fee_sync - threading.Thread(target=_run, daemon=True, name="ctp-fee-sync").start() + def _gs(key: str, default: str = "") -> str: + from fee_specs import get_setting + return get_setting(key, default) + + def _ss(key: str, val: str) -> None: + from fee_specs import set_setting + set_setting(key, val) + + try_daily_ctp_fee_sync( + mode, + get_setting=_gs, + set_setting=_ss, + force=False, + ) + except Exception as exc: + logger.debug("CTP 手续费连接后检查: %s", exc) + + threading.Thread(target=_run, daemon=True, name="ctp-fee-sync-check").start() def _ensure_commission_callback(self) -> None: if self._commission_hooked or not self._engine: