From 3fe4add8e1e8b9af629bcb0ddbf602d4de139025 Mon Sep 17 00:00:00 2001 From: dekun Date: Wed, 24 Jun 2026 13:18:43 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=A1=8C=E6=83=85K=E7=BA=BF=E4=BC=98?= =?UTF-8?q?=E5=85=88CTP=20tick=E8=81=9A=E5=90=88=EF=BC=8C=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E6=89=8B=E7=BB=AD=E8=B4=B9=E5=90=8C=E6=AD=A5=E4=B8=BB?= =?UTF-8?q?=E5=8A=9B=E5=88=97=E8=A1=A8=E8=A7=A3=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Cursor --- app.py | 55 +++++++++++-- ctp_fee_sync.py | 32 +++++--- ctp_kline.py | 84 ++++++++++++++++++++ kline_chart.py | 32 +++++++- kline_stream.py | 13 +++- static/js/market.js | 22 +++++- templates/market.html | 2 +- vnpy_bridge.py | 174 +++++++++++++++++++++++++++++++++++++++++- 8 files changed, 390 insertions(+), 24 deletions(-) create mode 100644 ctp_kline.py diff --git a/app.py b/app.py index 4f5b62c..f8fd82a 100644 --- a/app.py +++ b/app.py @@ -428,13 +428,30 @@ def build_market_quote_payload( if codes: market_code = codes.get("market_code", "") or market_code sina_code = codes.get("sina_code", "") or sina_code - price = fetch_price(symbol, market_code, sina_code) + quote_source = "sina" + price = None + prev_close = None + try: + from vnpy_bridge import ctp_status, ctp_get_tick_detail + from trading_context import get_trading_mode + + mode = get_trading_mode(get_setting) + if ctp_status(mode).get("connected"): + detail = ctp_get_tick_detail(mode, symbol) + if detail.get("price"): + price = detail["price"] + quote_source = "ctp" + if detail.get("pre_close") is not None: + prev_close = detail["pre_close"] + except Exception: + pass + if price is None: + price = fetch_price(symbol, market_code, sina_code) name = symbol codes = ths_to_codes(symbol) if codes: name = codes.get("name", symbol) - prev_close = None - if sina_code: + if prev_close is None and sina_code: from market import fetch_raw_for_volume raw = fetch_raw_for_volume(sina_code) if raw and raw.get("prev_close") is not None: @@ -444,6 +461,7 @@ def build_market_quote_payload( "name": name, "price": price, "prev_close": prev_close, + "quote_source": quote_source, } @@ -643,9 +661,15 @@ def background_task(): def start_background_threads(): + from trading_context import get_trading_mode + threading.Thread(target=background_task, daemon=True).start() threading.Thread( - target=lambda: kline_hub.worker_loop(DB_PATH, build_market_quote_payload), + target=lambda: kline_hub.worker_loop( + DB_PATH, + build_market_quote_payload, + get_mode_fn=lambda: get_trading_mode(get_setting), + ), daemon=True, ).start() threading.Thread(target=refresh_main_index, daemon=True).start() @@ -1397,11 +1421,21 @@ def market_page(): valid = {p["key"] for p in MARKET_PERIODS} if period not in valid: period = "15m" + ctp_st = {} + try: + from vnpy_bridge import ctp_status + from trading_context import get_trading_mode + + ctp_st = ctp_status(get_trading_mode(get_setting)) + except Exception: + pass return render_template( "market.html", symbol=symbol, period=period, market_periods=MARKET_PERIODS, + quote_label=get_quote_source_label(ctp_connected=bool(ctp_st.get("connected"))), + ctp_connected=bool(ctp_st.get("connected")), ) @@ -1413,7 +1447,11 @@ def api_kline(): if not symbol: return jsonify({"error": "请提供合约代码"}), 400 try: - data = fetch_market_klines(symbol, period, DB_PATH) + from trading_context import get_trading_mode + + data = fetch_market_klines( + symbol, period, DB_PATH, trading_mode=get_trading_mode(get_setting), + ) except Exception as exc: app.logger.warning("kline api failed: %s", exc) return jsonify({"error": str(exc)}), 500 @@ -1437,9 +1475,14 @@ def api_kline_stream(): return jsonify({"error": "请提供合约代码"}), 400 def generate(): + from trading_context import get_trading_mode + + mode = get_trading_mode(get_setting) sub = kline_hub.subscribe(symbol, period, market_code, sina_code) try: - kline_data = fetch_market_klines(symbol, period, DB_PATH) + kline_data = fetch_market_klines( + symbol, period, DB_PATH, trading_mode=mode, + ) if kline_data.get("bars"): yield sse_format("kline", kline_data) yield sse_format( diff --git a/ctp_fee_sync.py b/ctp_fee_sync.py index f4d6da0..0b43e30 100644 --- a/ctp_fee_sync.py +++ b/ctp_fee_sync.py @@ -35,6 +35,28 @@ def ctp_commission_to_fee_fields(data: dict, ths_code: str) -> dict: } +def _collect_main_ths_codes() -> list[str]: + """从主力列表收集同花顺合约代码(供 CTP 手续费查询)。""" + from datetime import date + + from symbols import PRODUCTS, build_ths_code, list_main_contracts_grouped + + symbols: list[str] = [] + for group in list_main_contracts_grouped(): + for item in group.get("items") or []: + ths = (item.get("ths_code") or item.get("ths") or item.get("code") or "").strip() + if ths and not ths.endswith("888"): + symbols.append(ths) + + if symbols: + return symbols + + today = date.today() + for p in PRODUCTS: + symbols.append(build_ths_code(p, today.year, today.month)) + return symbols + + def sync_fees_from_ctp(mode: str, *, max_symbols: int = 80) -> tuple[int, str]: """CTP 已连接时,按主力合约查询手续费并写入 fee_rates(source=ctp)。""" bridge = get_bridge() @@ -45,15 +67,7 @@ def sync_fees_from_ctp(mode: str, *, max_symbols: int = 80) -> tuple[int, str]: if not bridge.ping(): return 0, "CTP 连接无效,请重连" - from symbols import list_main_contracts_grouped - - mains = list_main_contracts_grouped() - symbols: list[str] = [] - for g in mains: - ths = (g.get("ths") or g.get("code") or "").strip() - if ths: - symbols.append(ths) - symbols = symbols[:max_symbols] + symbols = _collect_main_ths_codes()[:max_symbols] if not symbols: return 0, "无主力合约列表" diff --git a/ctp_kline.py b/ctp_kline.py new file mode 100644 index 0000000..7a91ab9 --- /dev/null +++ b/ctp_kline.py @@ -0,0 +1,84 @@ +"""CTP tick 聚合 K 线(1 分钟为基础,再合成各周期)。""" +from __future__ import annotations + +import logging +from typing import Optional + +from kline_chart import ( + PERIOD_MINUTES, + _aggregate_bars, + _bar_datetime, + _merge_bars, + _timeshare_session, + _weekly_from_daily, +) + +logger = logging.getLogger(__name__) + +PERIOD_AGG = { + "2m": 2, + "3m": 3, + "5m": 5, + "15m": 15, + "30m": 30, + "1h": 60, + "2h": 120, + "4h": 240, +} + + +def _daily_from_1m(bars_1m: list) -> list: + if not bars_1m: + return [] + buckets: dict[str, list] = {} + for bar in bars_1m: + dt = _bar_datetime(bar) + if not dt: + continue + key = dt.strftime("%Y-%m-%d") + buckets.setdefault(key, []).append(bar) + out = [] + for day in sorted(buckets.keys()): + chunk = buckets[day] + merged = _merge_bars(chunk) + merged["d"] = day + " 15:00:00" + out.append(merged) + return out + + +def compose_period_bars(bars_1m: list, period: str) -> list: + p = (period or "15m").lower() + if p == "timeshare": + return _timeshare_session(bars_1m) + if p in ("1d", "d"): + return _daily_from_1m(bars_1m) + if p == "w": + return _weekly_from_daily(_daily_from_1m(bars_1m)) + if p == "1m": + return list(bars_1m) + n = PERIOD_AGG.get(p) + if n: + return _aggregate_bars(bars_1m, n) + if p in PERIOD_MINUTES: + try: + n = int(PERIOD_MINUTES[p]) + return _aggregate_bars(bars_1m, n) + except (TypeError, ValueError): + pass + return list(bars_1m) + + +def fetch_ctp_klines(symbol: str, period: str, mode: str) -> Optional[list]: + """CTP 已连接时由 tick 聚合 K 线;失败返回 None。""" + try: + from vnpy_bridge import ctp_status, get_bridge + + if not ctp_status(mode).get("connected"): + return None + bars_1m = get_bridge().get_kline_bars_1m(symbol, mode=mode) + if not bars_1m: + return None + return compose_period_bars(bars_1m, period) + except Exception as exc: + logger.debug("fetch_ctp_klines %s %s: %s", symbol, period, exc) + return None diff --git a/kline_chart.py b/kline_chart.py index 8c6ff1c..fe94dc4 100644 --- a/kline_chart.py +++ b/kline_chart.py @@ -221,6 +221,9 @@ def fetch_market_klines( period: str, db_path: Optional[str] = None, force_remote: bool = False, + *, + trading_mode: Optional[str] = None, + prefer_ctp: bool = True, ) -> dict: chart_sym = ths_to_sina_chart_symbol(symbol) p = (period or "15m").lower() @@ -232,8 +235,32 @@ def fetch_market_klines( bars: list = [] source = "remote" cached_at = None + ctp_connected = False - if db_path and chart_sym and not force_remote: + if prefer_ctp: + try: + from ctp_kline import fetch_ctp_klines + from vnpy_bridge import ctp_status + + mode = trading_mode + if not mode: + try: + from app import get_setting + from trading_context import get_trading_mode + + mode = get_trading_mode(get_setting) + except Exception: + mode = "simulation" + ctp_connected = bool(ctp_status(mode).get("connected")) + if ctp_connected: + ctp_bars = fetch_ctp_klines(symbol, p, mode) + if ctp_bars: + bars = ctp_bars + source = "ctp" + except Exception as exc: + logger.debug("ctp kline fetch failed %s %s: %s", symbol, p, exc) + + if not bars and db_path and chart_sym and not force_remote: try: conn = connect_db(db_path) cached = get_cached_entry(conn, chart_sym, p) @@ -250,7 +277,7 @@ def fetch_market_klines( if remote_bars: bars = remote_bars source = "remote" - if db_path and chart_sym: + if db_path and chart_sym and not ctp_connected: try: conn = connect_db(db_path) ensure_kline_tables(conn) @@ -290,6 +317,7 @@ def fetch_market_klines( "prev_close": prev_close, "source": source, "cached_at": cached_at, + "ctp_connected": ctp_connected, } diff --git a/kline_stream.py b/kline_stream.py index 4ba4244..f55ba0e 100644 --- a/kline_stream.py +++ b/kline_stream.py @@ -114,7 +114,12 @@ class KlineStreamHub: logger.warning("kline refresh check failed: %s", exc) return True - def worker_loop(self, db_path: str, quote_fn: Callable[..., dict]) -> None: + def worker_loop( + self, + db_path: str, + quote_fn: Callable[..., dict], + get_mode_fn: Optional[Callable[[], str]] = None, + ) -> None: while True: try: subs = self._snapshot_subs() @@ -123,7 +128,11 @@ class KlineStreamHub: continue try: kline_data = fetch_market_klines( - sub.symbol, sub.period, db_path, force_remote=True, + sub.symbol, + sub.period, + db_path, + force_remote=True, + trading_mode=get_mode_fn() if get_mode_fn else None, ) if kline_data.get("bars"): self.publish(sub, "kline", kline_data) diff --git a/static/js/market.js b/static/js/market.js index d7b7ee2..aa29c02 100644 --- a/static/js/market.js +++ b/static/js/market.js @@ -574,6 +574,12 @@ } } + function klineSourceLabel(src) { + if (src === 'ctp') return 'CTP'; + if (src === 'local') return '本地缓存'; + return '新浪'; + } + function updateRefreshHint(disconnected) { var el = document.getElementById('market-refresh-hint'); if (!el) return; @@ -589,7 +595,10 @@ el.textContent = ''; return; } - var src = lastData && lastData.source === 'local' ? ' · 本地缓存' : ''; + var src = ''; + if (lastData && lastData.source) { + src = ' · ' + klineSourceLabel(lastData.source); + } if (isTradingSession()) { el.textContent = '交易中 · 后台刷新 · SSE 推送(约1秒)' + src; } else { @@ -614,6 +623,9 @@ if (priceEl) { priceEl.textContent = data.price != null ? Number(data.price).toFixed(2) : '—'; } + if (data.quote_source && lastData) { + updateQuoteMeta(Object.assign({}, lastData, { quote_source: data.quote_source })); + } if (data.prev_close != null) { lastPrevClose = data.prev_close; updatePrevCloseDisplay(data.prev_close); @@ -689,7 +701,13 @@ function updateQuoteMeta(data) { var meta = document.getElementById('market-quote-meta'); if (meta) { - meta.textContent = data.count ? ('共 ' + data.count + ' 根 · ' + periodLabel(data.period)) : ''; + var parts = []; + if (data.count) parts.push('共 ' + data.count + ' 根 · ' + periodLabel(data.period)); + if (data.source) parts.push('K线 ' + klineSourceLabel(data.source)); + if (data.quote_source) { + parts.push('报价 ' + (data.quote_source === 'ctp' ? 'CTP' : '新浪')); + } + meta.textContent = parts.join(' · '); } var nameEl = document.getElementById('market-quote-name'); var hiddenName = document.getElementById('market-symbol-name'); diff --git a/templates/market.html b/templates/market.html index 3be829d..96f54bc 100644 --- a/templates/market.html +++ b/templates/market.html @@ -45,7 +45,7 @@
请选择合约并点击「查看」
连接中…
-

数据来源:新浪财经。拖拽左右平移、滚轮缩放;按住图表上下拖动可平移价格轴。可视区内自动标注最高/最低价。

+

数据来源:{% if ctp_connected %}CTP 柜台 tick 聚合(实时价与 K 线){% else %}CTP 未连接时 K 线与报价回退新浪{% endif %}。拖拽左右平移、滚轮缩放;按住图表上下拖动可平移价格轴。可视区内自动标注最高/最低价。