feat: 行情K线优先CTP tick聚合,修复手续费同步主力列表解析

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-24 13:18:43 +08:00
parent 09f4649d79
commit 3fe4add8e1
8 changed files with 390 additions and 24 deletions
+49 -6
View File
@@ -428,13 +428,30 @@ def build_market_quote_payload(
if codes: if codes:
market_code = codes.get("market_code", "") or market_code market_code = codes.get("market_code", "") or market_code
sina_code = codes.get("sina_code", "") or sina_code sina_code = codes.get("sina_code", "") or sina_code
price = fetch_price(symbol, market_code, sina_code) quote_source = "sina"
price = None
prev_close = None
try:
from vnpy_bridge import ctp_status, ctp_get_tick_detail
from trading_context import get_trading_mode
mode = get_trading_mode(get_setting)
if ctp_status(mode).get("connected"):
detail = ctp_get_tick_detail(mode, symbol)
if detail.get("price"):
price = detail["price"]
quote_source = "ctp"
if detail.get("pre_close") is not None:
prev_close = detail["pre_close"]
except Exception:
pass
if price is None:
price = fetch_price(symbol, market_code, sina_code)
name = symbol name = symbol
codes = ths_to_codes(symbol) codes = ths_to_codes(symbol)
if codes: if codes:
name = codes.get("name", symbol) name = codes.get("name", symbol)
prev_close = None if prev_close is None and sina_code:
if sina_code:
from market import fetch_raw_for_volume from market import fetch_raw_for_volume
raw = fetch_raw_for_volume(sina_code) raw = fetch_raw_for_volume(sina_code)
if raw and raw.get("prev_close") is not None: if raw and raw.get("prev_close") is not None:
@@ -444,6 +461,7 @@ def build_market_quote_payload(
"name": name, "name": name,
"price": price, "price": price,
"prev_close": prev_close, "prev_close": prev_close,
"quote_source": quote_source,
} }
@@ -643,9 +661,15 @@ def background_task():
def start_background_threads(): def start_background_threads():
from trading_context import get_trading_mode
threading.Thread(target=background_task, daemon=True).start() threading.Thread(target=background_task, daemon=True).start()
threading.Thread( threading.Thread(
target=lambda: kline_hub.worker_loop(DB_PATH, build_market_quote_payload), target=lambda: kline_hub.worker_loop(
DB_PATH,
build_market_quote_payload,
get_mode_fn=lambda: get_trading_mode(get_setting),
),
daemon=True, daemon=True,
).start() ).start()
threading.Thread(target=refresh_main_index, daemon=True).start() threading.Thread(target=refresh_main_index, daemon=True).start()
@@ -1397,11 +1421,21 @@ def market_page():
valid = {p["key"] for p in MARKET_PERIODS} valid = {p["key"] for p in MARKET_PERIODS}
if period not in valid: if period not in valid:
period = "15m" period = "15m"
ctp_st = {}
try:
from vnpy_bridge import ctp_status
from trading_context import get_trading_mode
ctp_st = ctp_status(get_trading_mode(get_setting))
except Exception:
pass
return render_template( return render_template(
"market.html", "market.html",
symbol=symbol, symbol=symbol,
period=period, period=period,
market_periods=MARKET_PERIODS, market_periods=MARKET_PERIODS,
quote_label=get_quote_source_label(ctp_connected=bool(ctp_st.get("connected"))),
ctp_connected=bool(ctp_st.get("connected")),
) )
@@ -1413,7 +1447,11 @@ def api_kline():
if not symbol: if not symbol:
return jsonify({"error": "请提供合约代码"}), 400 return jsonify({"error": "请提供合约代码"}), 400
try: try:
data = fetch_market_klines(symbol, period, DB_PATH) from trading_context import get_trading_mode
data = fetch_market_klines(
symbol, period, DB_PATH, trading_mode=get_trading_mode(get_setting),
)
except Exception as exc: except Exception as exc:
app.logger.warning("kline api failed: %s", exc) app.logger.warning("kline api failed: %s", exc)
return jsonify({"error": str(exc)}), 500 return jsonify({"error": str(exc)}), 500
@@ -1437,9 +1475,14 @@ def api_kline_stream():
return jsonify({"error": "请提供合约代码"}), 400 return jsonify({"error": "请提供合约代码"}), 400
def generate(): def generate():
from trading_context import get_trading_mode
mode = get_trading_mode(get_setting)
sub = kline_hub.subscribe(symbol, period, market_code, sina_code) sub = kline_hub.subscribe(symbol, period, market_code, sina_code)
try: try:
kline_data = fetch_market_klines(symbol, period, DB_PATH) kline_data = fetch_market_klines(
symbol, period, DB_PATH, trading_mode=mode,
)
if kline_data.get("bars"): if kline_data.get("bars"):
yield sse_format("kline", kline_data) yield sse_format("kline", kline_data)
yield sse_format( yield sse_format(
+23 -9
View File
@@ -35,6 +35,28 @@ def ctp_commission_to_fee_fields(data: dict, ths_code: str) -> dict:
} }
def _collect_main_ths_codes() -> list[str]:
"""从主力列表收集同花顺合约代码(供 CTP 手续费查询)。"""
from datetime import date
from symbols import PRODUCTS, build_ths_code, list_main_contracts_grouped
symbols: list[str] = []
for group in list_main_contracts_grouped():
for item in group.get("items") or []:
ths = (item.get("ths_code") or item.get("ths") or item.get("code") or "").strip()
if ths and not ths.endswith("888"):
symbols.append(ths)
if symbols:
return symbols
today = date.today()
for p in PRODUCTS:
symbols.append(build_ths_code(p, today.year, today.month))
return symbols
def sync_fees_from_ctp(mode: str, *, max_symbols: int = 80) -> tuple[int, str]: def sync_fees_from_ctp(mode: str, *, max_symbols: int = 80) -> tuple[int, str]:
"""CTP 已连接时,按主力合约查询手续费并写入 fee_ratessource=ctp)。""" """CTP 已连接时,按主力合约查询手续费并写入 fee_ratessource=ctp)。"""
bridge = get_bridge() bridge = get_bridge()
@@ -45,15 +67,7 @@ def sync_fees_from_ctp(mode: str, *, max_symbols: int = 80) -> tuple[int, str]:
if not bridge.ping(): if not bridge.ping():
return 0, "CTP 连接无效,请重连" return 0, "CTP 连接无效,请重连"
from symbols import list_main_contracts_grouped symbols = _collect_main_ths_codes()[:max_symbols]
mains = list_main_contracts_grouped()
symbols: list[str] = []
for g in mains:
ths = (g.get("ths") or g.get("code") or "").strip()
if ths:
symbols.append(ths)
symbols = symbols[:max_symbols]
if not symbols: if not symbols:
return 0, "无主力合约列表" return 0, "无主力合约列表"
+84
View File
@@ -0,0 +1,84 @@
"""CTP tick 聚合 K 线(1 分钟为基础,再合成各周期)。"""
from __future__ import annotations
import logging
from typing import Optional
from kline_chart import (
PERIOD_MINUTES,
_aggregate_bars,
_bar_datetime,
_merge_bars,
_timeshare_session,
_weekly_from_daily,
)
logger = logging.getLogger(__name__)
PERIOD_AGG = {
"2m": 2,
"3m": 3,
"5m": 5,
"15m": 15,
"30m": 30,
"1h": 60,
"2h": 120,
"4h": 240,
}
def _daily_from_1m(bars_1m: list) -> list:
if not bars_1m:
return []
buckets: dict[str, list] = {}
for bar in bars_1m:
dt = _bar_datetime(bar)
if not dt:
continue
key = dt.strftime("%Y-%m-%d")
buckets.setdefault(key, []).append(bar)
out = []
for day in sorted(buckets.keys()):
chunk = buckets[day]
merged = _merge_bars(chunk)
merged["d"] = day + " 15:00:00"
out.append(merged)
return out
def compose_period_bars(bars_1m: list, period: str) -> list:
p = (period or "15m").lower()
if p == "timeshare":
return _timeshare_session(bars_1m)
if p in ("1d", "d"):
return _daily_from_1m(bars_1m)
if p == "w":
return _weekly_from_daily(_daily_from_1m(bars_1m))
if p == "1m":
return list(bars_1m)
n = PERIOD_AGG.get(p)
if n:
return _aggregate_bars(bars_1m, n)
if p in PERIOD_MINUTES:
try:
n = int(PERIOD_MINUTES[p])
return _aggregate_bars(bars_1m, n)
except (TypeError, ValueError):
pass
return list(bars_1m)
def fetch_ctp_klines(symbol: str, period: str, mode: str) -> Optional[list]:
"""CTP 已连接时由 tick 聚合 K 线;失败返回 None。"""
try:
from vnpy_bridge import ctp_status, get_bridge
if not ctp_status(mode).get("connected"):
return None
bars_1m = get_bridge().get_kline_bars_1m(symbol, mode=mode)
if not bars_1m:
return None
return compose_period_bars(bars_1m, period)
except Exception as exc:
logger.debug("fetch_ctp_klines %s %s: %s", symbol, period, exc)
return None
+30 -2
View File
@@ -221,6 +221,9 @@ def fetch_market_klines(
period: str, period: str,
db_path: Optional[str] = None, db_path: Optional[str] = None,
force_remote: bool = False, force_remote: bool = False,
*,
trading_mode: Optional[str] = None,
prefer_ctp: bool = True,
) -> dict: ) -> dict:
chart_sym = ths_to_sina_chart_symbol(symbol) chart_sym = ths_to_sina_chart_symbol(symbol)
p = (period or "15m").lower() p = (period or "15m").lower()
@@ -232,8 +235,32 @@ def fetch_market_klines(
bars: list = [] bars: list = []
source = "remote" source = "remote"
cached_at = None cached_at = None
ctp_connected = False
if db_path and chart_sym and not force_remote: if prefer_ctp:
try:
from ctp_kline import fetch_ctp_klines
from vnpy_bridge import ctp_status
mode = trading_mode
if not mode:
try:
from app import get_setting
from trading_context import get_trading_mode
mode = get_trading_mode(get_setting)
except Exception:
mode = "simulation"
ctp_connected = bool(ctp_status(mode).get("connected"))
if ctp_connected:
ctp_bars = fetch_ctp_klines(symbol, p, mode)
if ctp_bars:
bars = ctp_bars
source = "ctp"
except Exception as exc:
logger.debug("ctp kline fetch failed %s %s: %s", symbol, p, exc)
if not bars and db_path and chart_sym and not force_remote:
try: try:
conn = connect_db(db_path) conn = connect_db(db_path)
cached = get_cached_entry(conn, chart_sym, p) cached = get_cached_entry(conn, chart_sym, p)
@@ -250,7 +277,7 @@ def fetch_market_klines(
if remote_bars: if remote_bars:
bars = remote_bars bars = remote_bars
source = "remote" source = "remote"
if db_path and chart_sym: if db_path and chart_sym and not ctp_connected:
try: try:
conn = connect_db(db_path) conn = connect_db(db_path)
ensure_kline_tables(conn) ensure_kline_tables(conn)
@@ -290,6 +317,7 @@ def fetch_market_klines(
"prev_close": prev_close, "prev_close": prev_close,
"source": source, "source": source,
"cached_at": cached_at, "cached_at": cached_at,
"ctp_connected": ctp_connected,
} }
+11 -2
View File
@@ -114,7 +114,12 @@ class KlineStreamHub:
logger.warning("kline refresh check failed: %s", exc) logger.warning("kline refresh check failed: %s", exc)
return True return True
def worker_loop(self, db_path: str, quote_fn: Callable[..., dict]) -> None: def worker_loop(
self,
db_path: str,
quote_fn: Callable[..., dict],
get_mode_fn: Optional[Callable[[], str]] = None,
) -> None:
while True: while True:
try: try:
subs = self._snapshot_subs() subs = self._snapshot_subs()
@@ -123,7 +128,11 @@ class KlineStreamHub:
continue continue
try: try:
kline_data = fetch_market_klines( kline_data = fetch_market_klines(
sub.symbol, sub.period, db_path, force_remote=True, sub.symbol,
sub.period,
db_path,
force_remote=True,
trading_mode=get_mode_fn() if get_mode_fn else None,
) )
if kline_data.get("bars"): if kline_data.get("bars"):
self.publish(sub, "kline", kline_data) self.publish(sub, "kline", kline_data)
+20 -2
View File
@@ -574,6 +574,12 @@
} }
} }
function klineSourceLabel(src) {
if (src === 'ctp') return 'CTP';
if (src === 'local') return '本地缓存';
return '新浪';
}
function updateRefreshHint(disconnected) { function updateRefreshHint(disconnected) {
var el = document.getElementById('market-refresh-hint'); var el = document.getElementById('market-refresh-hint');
if (!el) return; if (!el) return;
@@ -589,7 +595,10 @@
el.textContent = ''; el.textContent = '';
return; return;
} }
var src = lastData && lastData.source === 'local' ? ' · 本地缓存' : ''; var src = '';
if (lastData && lastData.source) {
src = ' · ' + klineSourceLabel(lastData.source);
}
if (isTradingSession()) { if (isTradingSession()) {
el.textContent = '交易中 · 后台刷新 · SSE 推送(约1秒)' + src; el.textContent = '交易中 · 后台刷新 · SSE 推送(约1秒)' + src;
} else { } else {
@@ -614,6 +623,9 @@
if (priceEl) { if (priceEl) {
priceEl.textContent = data.price != null ? Number(data.price).toFixed(2) : '—'; priceEl.textContent = data.price != null ? Number(data.price).toFixed(2) : '—';
} }
if (data.quote_source && lastData) {
updateQuoteMeta(Object.assign({}, lastData, { quote_source: data.quote_source }));
}
if (data.prev_close != null) { if (data.prev_close != null) {
lastPrevClose = data.prev_close; lastPrevClose = data.prev_close;
updatePrevCloseDisplay(data.prev_close); updatePrevCloseDisplay(data.prev_close);
@@ -689,7 +701,13 @@
function updateQuoteMeta(data) { function updateQuoteMeta(data) {
var meta = document.getElementById('market-quote-meta'); var meta = document.getElementById('market-quote-meta');
if (meta) { if (meta) {
meta.textContent = data.count ? ('共 ' + data.count + ' 根 · ' + periodLabel(data.period)) : ''; var parts = [];
if (data.count) parts.push('共 ' + data.count + ' 根 · ' + periodLabel(data.period));
if (data.source) parts.push('K线 ' + klineSourceLabel(data.source));
if (data.quote_source) {
parts.push('报价 ' + (data.quote_source === 'ctp' ? 'CTP' : '新浪'));
}
meta.textContent = parts.join(' · ');
} }
var nameEl = document.getElementById('market-quote-name'); var nameEl = document.getElementById('market-quote-name');
var hiddenName = document.getElementById('market-symbol-name'); var hiddenName = document.getElementById('market-symbol-name');
+1 -1
View File
@@ -45,7 +45,7 @@
<div class="market-chart-empty" id="market-chart-empty">请选择合约并点击「查看」</div> <div class="market-chart-empty" id="market-chart-empty">请选择合约并点击「查看」</div>
<div class="market-chart-loading" id="market-chart-loading">连接中…</div> <div class="market-chart-loading" id="market-chart-loading">连接中…</div>
</div> </div>
<p class="hint">数据来源:新浪财经。拖拽左右平移、滚轮缩放;按住图表上下拖动可平移价格轴。可视区内自动标注最高/最低价。</p> <p class="hint">数据来源:{% if ctp_connected %}CTP 柜台 tick 聚合(实时价与 K 线){% else %}CTP 未连接时 K 线与报价回退新浪{% endif %}。拖拽左右平移、滚轮缩放;按住图表上下拖动可平移价格轴。可视区内自动标注最高/最低价。</p>
</div> </div>
<style> <style>
+172 -2
View File
@@ -5,6 +5,7 @@ import logging
import os import os
import threading import threading
import time import time
from collections import deque
from typing import Any, Optional from typing import Any, Optional
from locale_fix import ensure_process_locale from locale_fix import ensure_process_locale
@@ -91,6 +92,8 @@ class CtpBridge:
self._commission_hooked = False self._commission_hooked = False
self._subscribed: set[str] = set() self._subscribed: set[str] = set()
self._tick_hooked = False self._tick_hooked = False
self._bar_generators: dict[str, Any] = {}
self._bars_1m: dict[str, deque] = {}
self._init_engine() self._init_engine()
def _init_engine(self) -> None: def _init_engine(self) -> None:
@@ -241,6 +244,12 @@ class CtpBridge:
bridge = self bridge = self
def on_rsp(data: dict, error: dict, reqid: int, last: bool) -> None: def on_rsp(data: dict, error: dict, reqid: int, last: bool) -> None:
if error and int(error.get("ErrorID") or 0) != 0:
logger.debug(
"CTP commission error reqid=%s: %s",
reqid,
error.get("ErrorMsg") or error,
)
if data and data.get("InstrumentID"): if data and data.get("InstrumentID"):
bridge._commission_results[reqid] = dict(data) bridge._commission_results[reqid] = dict(data)
ev = bridge._commission_waiters.get(reqid) ev = bridge._commission_waiters.get(reqid)
@@ -255,8 +264,7 @@ class CtpBridge:
if self._connected_mode != mode or not self._engine: if self._connected_mode != mode or not self._engine:
return {} return {}
try: try:
from ctp_symbol import ths_to_vnpy_symbol sym, ex_name = ths_to_vnpy_symbol(ths_code)
sym, _ = ths_to_vnpy_symbol(ths_code)
gw = self._engine.get_gateway(GATEWAY_NAME) gw = self._engine.get_gateway(GATEWAY_NAME)
td = gw.td_api td = gw.td_api
except Exception as exc: except Exception as exc:
@@ -275,6 +283,7 @@ class CtpBridge:
"BrokerID": td.brokerid, "BrokerID": td.brokerid,
"InvestorID": td.userid, "InvestorID": td.userid,
"InstrumentID": sym, "InstrumentID": sym,
"ExchangeID": ex_name,
} }
ret = td.reqQryInstrumentCommissionRate(req, reqid) ret = td.reqQryInstrumentCommissionRate(req, reqid)
if ret != 0: if ret != 0:
@@ -315,11 +324,160 @@ class CtpBridge:
logger.debug("lookup tick: %s", exc) logger.debug("lookup tick: %s", exc)
return None return None
def _bar_to_dict(self, bar: Any) -> dict:
dt = getattr(bar, "datetime", None)
d_str = dt.strftime("%Y-%m-%d %H:%M:%S") if dt else ""
return {
"d": d_str,
"o": float(getattr(bar, "open_price", 0) or 0),
"h": float(getattr(bar, "high_price", 0) or 0),
"l": float(getattr(bar, "low_price", 0) or 0),
"c": float(getattr(bar, "close_price", 0) or 0),
"v": float(getattr(bar, "volume", 0) or 0),
}
def _ensure_bar_generator(self, sym: str, ex_name: str) -> None:
key = self._tick_key(sym, ex_name)
if key in self._bar_generators:
return
self._bars_1m[key] = deque(maxlen=4000)
def on_bar(bar: Any) -> None:
row = self._bar_to_dict(bar)
if row.get("d"):
self._bars_1m[key].append(row)
try:
from vnpy.trader.utility import BarGenerator
self._bar_generators[key] = BarGenerator(on_bar=on_bar)
except ImportError:
logger.debug("BarGenerator unavailable")
def _find_tick(self, symbol: str, ex_name: str) -> Any:
if not self._engine:
return None
sym_l = symbol.lower()
ex_u = ex_name.upper()
try:
for tick in self._engine.get_all_ticks():
ts = (getattr(tick, "symbol", "") or "").lower()
te = getattr(tick, "exchange", None)
te_s = str(te.value if hasattr(te, "value") else te or "").upper()
if ts == sym_l and te_s == ex_u:
return tick
except Exception as exc:
logger.debug("find tick: %s", exc)
return None
def _tick_to_bar(self, symbol: str, ex_name: str) -> Optional[dict]:
tick = self._find_tick(symbol, ex_name)
if not tick:
return None
lp = self._price_from_tick(tick)
if not lp or lp <= 0:
return None
dt = getattr(tick, "datetime", None)
d_str = dt.strftime("%Y-%m-%d %H:%M:%S") if dt else ""
if not d_str:
from datetime import datetime
from zoneinfo import ZoneInfo
d_str = datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S")
o = float(getattr(tick, "open_price", 0) or lp)
h = float(getattr(tick, "high_price", 0) or lp)
lo = float(getattr(tick, "low_price", 0) or lp)
return {
"d": d_str,
"o": o,
"h": h,
"l": lo,
"c": lp,
"v": float(getattr(tick, "volume", 0) or 0),
}
def _on_tick(self, tick: Any) -> None:
sym = (getattr(tick, "symbol", "") or "").lower()
te = getattr(tick, "exchange", None)
ex_s = str(te.value if hasattr(te, "value") else te or "").upper()
key = self._tick_key(sym, ex_s)
bg = self._bar_generators.get(key)
if not bg:
return
try:
bg.update_tick(tick)
except Exception as exc:
logger.debug("bar gen tick: %s", exc)
def _ensure_tick_handler(self) -> None: def _ensure_tick_handler(self) -> None:
if self._tick_hooked or not self._ee: if self._tick_hooked or not self._ee:
return return
try:
from vnpy.trader.event import EVENT_TICK
except ImportError:
return
def process_tick(event: Any) -> None:
self._on_tick(event.data)
self._ee.register(EVENT_TICK, process_tick)
self._tick_hooked = True self._tick_hooked = True
def get_kline_bars_1m(self, ths_code: str, *, mode: str) -> list[dict]:
"""订阅合约并返回 1 分钟 K 线(含正在形成的 bar)。"""
if self._connected_mode != mode or not self._engine:
return []
try:
sym, ex_name = ths_to_vnpy_symbol(ths_code)
except Exception:
return []
key = self._tick_key(sym, ex_name)
self._ensure_bar_generator(sym, ex_name)
self.subscribe_symbol(ths_code)
for _ in range(12):
if self._bars_1m.get(key) and len(self._bars_1m[key]) > 0:
break
if self._lookup_tick(sym, ex_name):
break
time.sleep(0.2)
bars_1m = list(self._bars_1m.get(key, []))
bg = self._bar_generators.get(key)
if bg and getattr(bg, "bar", None):
forming = self._bar_to_dict(bg.bar)
if forming.get("d"):
if not bars_1m or bars_1m[-1]["d"] != forming["d"]:
bars_1m.append(forming)
else:
bars_1m[-1] = forming
if not bars_1m:
tick_bar = self._tick_to_bar(sym, ex_name)
if tick_bar:
bars_1m = [tick_bar]
return bars_1m
def get_tick_detail(self, ths_code: str, *, mode: str) -> dict[str, Any]:
if self._connected_mode != mode or not self._engine:
return {}
try:
sym, ex_name = ths_to_vnpy_symbol(ths_code)
except Exception:
return {}
self.subscribe_symbol(ths_code)
for _ in range(8):
tick = self._find_tick(sym, ex_name)
if tick:
price = self._price_from_tick(tick)
try:
pre_close = float(getattr(tick, "pre_close", 0) or 0)
except (TypeError, ValueError):
pre_close = 0.0
return {
"price": price,
"pre_close": pre_close if pre_close > 0 else None,
}
time.sleep(0.2)
return {}
def subscribe_symbol(self, ths_code: str) -> None: def subscribe_symbol(self, ths_code: str) -> None:
if not self._engine or not self._connected_mode: if not self._engine or not self._connected_mode:
return return
@@ -328,6 +486,7 @@ class CtpBridge:
sym, ex_name = ths_to_vnpy_symbol(ths_code) sym, ex_name = ths_to_vnpy_symbol(ths_code)
key = self._tick_key(sym, ex_name) key = self._tick_key(sym, ex_name)
self._ensure_bar_generator(sym, ex_name)
if key in self._subscribed: if key in self._subscribed:
return return
exchange = to_vnpy_exchange(ex_name) exchange = to_vnpy_exchange(ex_name)
@@ -564,6 +723,17 @@ def ctp_get_tick_price(mode: str, ths_code: str) -> Optional[float]:
return None return None
def ctp_get_tick_detail(mode: str, ths_code: str) -> dict[str, Any]:
b = get_bridge()
if b.connected_mode != mode:
return {}
try:
return b.get_tick_detail(ths_code, mode=mode)
except Exception as exc:
logger.debug("ctp_get_tick_detail: %s", exc)
return {}
def get_ctp_balance(mode: str) -> Optional[float]: def get_ctp_balance(mode: str) -> Optional[float]:
try: try:
acc = ctp_get_account(mode) acc = ctp_get_account(mode)