# Copyright (c) 2025-2026 马建军. All rights reserved. # 专有软件 — 未经授权禁止复制、传播、转售。 # 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。 # 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md """CTP 手续费后台同步:每日一次写入数据库,前端只读展示。""" from __future__ import annotations import logging import threading import time from datetime import date, datetime from typing import Callable, Optional from zoneinfo import ZoneInfo logger = logging.getLogger(__name__) TZ = ZoneInfo("Asia/Shanghai") FEE_SYNC_KEY = "ctp_fee_last_sync" CHECK_INTERVAL_SEC = 3600 _sync_lock = threading.Lock() def fee_sync_in_progress() -> bool: return _sync_lock.locked() def _today_str() -> str: return datetime.now(TZ).date().isoformat() def get_fee_last_sync(get_setting: Callable[[str, str], str]) -> str: return (get_setting(FEE_SYNC_KEY, "") or "").strip() def fees_synced_today(get_setting: Callable[[str, str], str]) -> bool: last = get_fee_last_sync(get_setting) return bool(last) and last[:10] == _today_str() def mark_fees_synced(set_setting: Callable[[str, str], None]) -> None: set_setting(FEE_SYNC_KEY, datetime.now(TZ).isoformat(timespec="seconds")) def try_daily_ctp_fee_sync( mode: str, *, get_setting: Callable[[str, str], str], set_setting: Callable[[str, str], None], force: bool = False, ) -> tuple[int, str]: """CTP 已连接且今日未同步时拉取费率入库;force=True 忽略日期限制。""" if not force and fees_synced_today(get_setting): return 0, "今日已从 CTP 同步过,无需重复(可点「立即同步」强制刷新)" with _sync_lock: if not force and fees_synced_today(get_setting): return 0, "今日已从 CTP 同步过" t0 = time.monotonic() from ctp_fee_sync import sync_fees_from_ctp count, msg = sync_fees_from_ctp(mode) elapsed = time.monotonic() - t0 if count > 0: mark_fees_synced(set_setting) msg = f"{msg}(耗时 {elapsed:.1f} 秒)" logger.info("CTP 手续费每日同步: %s", msg) elif force: msg = f"{msg}(耗时 {elapsed:.1f} 秒)" logger.warning("CTP 手续费强制同步未写入: %s", msg) return count, msg def schedule_ctp_fee_sync( mode: str, *, get_setting: Callable[[str, str], str], set_setting: Callable[[str, str], None], force: bool = False, ) -> tuple[bool, str]: """后台线程同步,避免阻塞 Web 请求。""" if _sync_lock.locked(): return False, "手续费同步进行中,请稍后再试(约 1~3 分钟)" def _run() -> None: try: try_daily_ctp_fee_sync( mode, get_setting=get_setting, set_setting=set_setting, force=force, ) except Exception as exc: logger.exception("CTP 手续费后台同步失败: %s", exc) threading.Thread(target=_run, daemon=True, name="ctp-fee-sync-run").start() if force: return True, "已在后台开始同步,约 30 秒~2 分钟完成,请稍后刷新本页查看" return True, "已在后台检查同步,请稍后刷新本页" def start_ctp_fee_worker( *, get_mode_fn: Callable[[], str], get_setting_fn: Callable[[str, str], str], set_setting_fn: Callable[[str, str], None], interval: int = CHECK_INTERVAL_SEC, ) -> None: """后台线程:每小时检查,CTP 已连接且当日未同步则自动同步。""" def _loop() -> None: time.sleep(20) while True: try: from vnpy_bridge import ctp_status mode = get_mode_fn() st = ctp_status(mode) if st.get("connected") and not fees_synced_today(get_setting_fn): try_daily_ctp_fee_sync( mode, get_setting=get_setting_fn, set_setting=set_setting_fn, force=False, ) except Exception as exc: logger.warning("CTP fee worker: %s", exc) time.sleep(max(300, interval)) threading.Thread(target=_loop, daemon=True, name="ctp-fee-worker").start()