fix: K线新浪历史补齐与手续费页布局及CTP批量同步

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-24 13:26:53 +08:00
parent 3fe4add8e1
commit de6815d481
8 changed files with 178 additions and 47 deletions
+5 -1
View File
@@ -32,6 +32,8 @@ from fee_specs import (
get_fee_multiplier,
get_fee_source_mode,
list_all_fee_rates,
list_fee_rates_for_ui,
count_fee_rates_by_source,
load_fee_rates_from_json,
upsert_fee_rate,
)
@@ -1612,13 +1614,15 @@ def fees():
flash(f"已保存 {product} 费率")
return redirect(url_for("fees"))
rates = list_all_fee_rates()
rates = list_fee_rates_for_ui()
fee_counts = count_fee_rates_by_source()
multiplier = get_setting("fee_multiplier", "2")
fee_source_mode = get_fee_source_mode()
ctp_st = ctp_status(mode)
return render_template(
"fees.html",
rates=rates,
fee_counts=fee_counts,
multiplier=multiplier,
fee_source_mode=fee_source_mode,
ctp_connected=bool(ctp_st.get("connected")),
+25 -3
View File
@@ -67,14 +67,36 @@ def sync_fees_from_ctp(mode: str, *, max_symbols: int = 80) -> tuple[int, str]:
if not bridge.ping():
return 0, "CTP 连接无效,请重连"
seen: set[str] = set()
ok = 0
errors = 0
batch = bridge.query_all_commissions(mode=mode)
if batch:
for raw in batch:
inst = str(raw.get("InstrumentID") or "").strip()
product = _product_from_instrument(inst)
if not product or product in seen:
continue
seen.add(product)
try:
fields = ctp_commission_to_fee_fields(raw, inst or product)
upsert_fee_rate(product, fields)
ok += 1
except Exception as exc:
logger.debug("CTP fee batch %s: %s", inst, exc)
errors += 1
if ok > 0:
msg = f"已从 CTP 批量同步 {ok} 个品种手续费"
if errors:
msg += f"{errors} 个跳过)"
return ok, msg
symbols = _collect_main_ths_codes()[:max_symbols]
if not symbols:
return 0, "无主力合约列表"
seen: set[str] = set()
ok = 0
errors = 0
for ths in symbols:
product = _product_from_instrument(ths)
if not product or product in seen:
+25
View File
@@ -296,6 +296,31 @@ def list_all_fee_rates() -> list:
return [dict(r) for r in rows]
def list_fee_rates_for_ui() -> list:
"""手续费页展示:CTP 模式下 ctp 来源优先排前。"""
rows = list_all_fee_rates()
if get_fee_source_mode() == "ctp":
rows.sort(
key=lambda r: (
0 if (r.get("source") or "") == "ctp" else 1,
r.get("product") or "",
)
)
return rows
def count_fee_rates_by_source() -> dict[str, int]:
conn = _get_db()
rows = conn.execute(
"SELECT source, COUNT(*) AS n FROM fee_rates GROUP BY source"
).fetchall()
conn.close()
out: dict[str, int] = {}
for row in rows:
out[str(row["source"] or "local")] = int(row["n"] or 0)
return out
def upsert_fee_rate(product: str, fields: dict) -> None:
product = product.lower().strip()
conn = _get_db()
+39 -6
View File
@@ -17,6 +17,9 @@ from kline_store import ensure_kline_tables, get_cached_entry, save_bars
logger = logging.getLogger(__name__)
TZ = ZoneInfo("Asia/Shanghai")
# CTP tick 聚合 bar 少于此数时,用新浪历史补齐走势
MIN_CTP_KLINE_BARS = 15
PERIOD_MINUTES = {
"1m": "1",
"3m": "3",
@@ -165,6 +168,24 @@ def _merge_bars(chunk: list) -> dict:
}
def _merge_kline_bars(history: list, live: list) -> list:
"""新浪历史 + CTP 实时尾部(去重叠)。"""
if not history:
return list(live or [])
if not live:
return list(history)
first_live = _bar_datetime(live[0])
if not first_live:
return history + live
trimmed = []
for bar in history:
dt = _bar_datetime(bar)
if dt and dt < first_live:
trimmed.append(bar)
merged = trimmed + list(live)
return merged if merged else list(history)
def _weekly_from_daily(daily: list) -> list:
if not daily:
return []
@@ -236,6 +257,7 @@ def fetch_market_klines(
source = "remote"
cached_at = None
ctp_connected = False
ctp_bars: list = []
if prefer_ctp:
try:
@@ -253,14 +275,21 @@ def fetch_market_klines(
mode = "simulation"
ctp_connected = bool(ctp_status(mode).get("connected"))
if ctp_connected:
ctp_bars = fetch_ctp_klines(symbol, p, mode)
if ctp_bars:
bars = ctp_bars
source = "ctp"
ctp_bars = fetch_ctp_klines(symbol, p, mode) or []
except Exception as exc:
logger.debug("ctp kline fetch failed %s %s: %s", symbol, p, exc)
if not bars and db_path and chart_sym and not force_remote:
need_sina = (
force_remote
or not ctp_bars
or len(ctp_bars) < MIN_CTP_KLINE_BARS
)
if ctp_bars and len(ctp_bars) >= MIN_CTP_KLINE_BARS:
bars = ctp_bars
source = "ctp"
if not bars and db_path and chart_sym and not force_remote and need_sina:
try:
conn = connect_db(db_path)
cached = get_cached_entry(conn, chart_sym, p)
@@ -272,9 +301,13 @@ def fetch_market_klines(
except Exception as exc:
logger.warning("kline cache read failed %s %s: %s", chart_sym, p, exc)
if force_remote or not bars:
if not bars or len(ctp_bars) < MIN_CTP_KLINE_BARS:
remote_bars = fetch_sina_klines(symbol, p)
if remote_bars:
if ctp_bars and ctp_connected:
bars = _merge_kline_bars(remote_bars, ctp_bars)
source = "ctp+remote"
else:
bars = remote_bars
source = "remote"
if db_path and chart_sym and not ctp_connected:
+1
View File
@@ -576,6 +576,7 @@
function klineSourceLabel(src) {
if (src === 'ctp') return 'CTP';
if (src === 'ctp+remote') return '新浪+CTP';
if (src === 'local') return '本地缓存';
return '新浪';
}
+41 -17
View File
@@ -4,6 +4,15 @@
<style>
.fees-split{margin-bottom:1.25rem}
.fees-split .card{margin-bottom:0;min-height:auto}
.fees-table-card .trade-table-wrap{
max-height:min(70vh,560px);
width:100%;
border:none;
border-radius:10px;
}
.fees-table-card .trade-table{min-width:1100px}
.fees-table-card .card-body{padding:.75rem 1rem 1rem}
.fees-source-stats{font-size:.78rem;margin-top:.35rem}
</style>
{% endblock %}
{% block content %}
@@ -36,6 +45,15 @@
<span class="badge planned">CTP 未连接</span>
{% endif %}
</div>
{% if fee_counts %}
<p class="fees-source-stats text-muted">
已缓存:
{% if fee_counts.get('ctp') %}<span class="badge profit">CTP {{ fee_counts.ctp }}</span>{% endif %}
{% if fee_counts.get('local') %}<span class="badge planned">local {{ fee_counts.local }}</span>{% endif %}
{% if fee_counts.get('json') %}<span class="badge planned">json {{ fee_counts.json }}</span>{% endif %}
{% if fee_counts.get('manual') %}<span class="badge planned">manual {{ fee_counts.manual }}</span>{% endif %}
</p>
{% endif %}
</div>
</div>
@@ -63,10 +81,10 @@
</div>
</div>
<div class="card">
<div class="card fees-table-card">
<h2>品种费率表</h2>
<div class="card-body card-scroll">
<div class="table-responsive">
<div class="card-body">
<div class="trade-table-wrap">
<table class="trade-table">
<thead>
<tr>
@@ -79,23 +97,20 @@
</thead>
<tbody>
{% for r in rates %}
{% set fid = 'fee-row-' ~ r.product %}
<tr>
<form action="{{ url_for('fees') }}" method="post" style="display:contents">
<input type="hidden" name="action" value="save_row">
<input type="hidden" name="product" value="{{ r.product }}">
<td><strong>{{ r.product }}</strong></td>
<td><span class="badge {% if r.source == 'ctp' %}profit{% else %}planned{% endif %}">{{ r.source or 'local' }}</span></td>
<td><input name="exchange" value="{{ r.exchange or '' }}" style="width:72px;padding:.3rem"></td>
<td><input name="mult" type="number" value="{{ r.mult }}" style="width:56px;padding:.3rem"></td>
<td><input name="open_fixed" type="number" step="0.0001" value="{{ r.open_fixed }}" style="width:72px;padding:.3rem"></td>
<td><input name="open_ratio" type="number" step="0.0000001" value="{{ r.open_ratio }}" style="width:88px;padding:.3rem"></td>
<td><input name="close_yesterday_fixed" type="number" step="0.0001" value="{{ r.close_yesterday_fixed }}" style="width:72px;padding:.3rem"></td>
<td><input name="close_yesterday_ratio" type="number" step="0.0000001" value="{{ r.close_yesterday_ratio }}" style="width:88px;padding:.3rem"></td>
<td><input name="close_today_fixed" type="number" step="0.0001" value="{{ r.close_today_fixed }}" style="width:72px;padding:.3rem"></td>
<td><input name="close_today_ratio" type="number" step="0.0000001" value="{{ r.close_today_ratio }}" style="width:88px;padding:.3rem"></td>
<td><input name="exchange" form="{{ fid }}" value="{{ r.exchange or '' }}" style="width:72px;padding:.3rem"></td>
<td><input name="mult" form="{{ fid }}" type="number" value="{{ r.mult }}" style="width:56px;padding:.3rem"></td>
<td><input name="open_fixed" form="{{ fid }}" type="number" step="0.0001" value="{{ r.open_fixed }}" style="width:72px;padding:.3rem"></td>
<td><input name="open_ratio" form="{{ fid }}" type="number" step="0.0000001" value="{{ r.open_ratio }}" style="width:88px;padding:.3rem"></td>
<td><input name="close_yesterday_fixed" form="{{ fid }}" type="number" step="0.0001" value="{{ r.close_yesterday_fixed }}" style="width:72px;padding:.3rem"></td>
<td><input name="close_yesterday_ratio" form="{{ fid }}" type="number" step="0.0000001" value="{{ r.close_yesterday_ratio }}" style="width:88px;padding:.3rem"></td>
<td><input name="close_today_fixed" form="{{ fid }}" type="number" step="0.0001" value="{{ r.close_today_fixed }}" style="width:72px;padding:.3rem"></td>
<td><input name="close_today_ratio" form="{{ fid }}" type="number" step="0.0000001" value="{{ r.close_today_ratio }}" style="width:88px;padding:.3rem"></td>
<td class="text-muted" style="font-size:.72rem">{{ (r.updated_at or '')[:16] }}</td>
<td><button type="submit" class="btn-link">保存</button></td>
</form>
<td><button type="submit" form="{{ fid }}" class="btn-link">保存</button></td>
</tr>
{% else %}
<tr><td colspan="12" class="text-muted">暂无费率,请连接 CTP 后同步</td></tr>
@@ -103,9 +118,18 @@
</tbody>
</table>
</div>
{% for r in rates %}
<form id="fee-row-{{ r.product }}" action="{{ url_for('fees') }}" method="post" hidden>
<input type="hidden" name="action" value="save_row">
<input type="hidden" name="product" value="{{ r.product }}">
</form>
{% endfor %}
</div>
<p class="hint" style="margin-top:.75rem">
<p class="hint" style="margin-top:.75rem;padding:0 1rem 1rem">
公式:单边 = 固定(元/手)×手数 + 比例×价格×乘数×手数;往返 = 开仓 + 平仓(平今/平昨自动判断)。
{% if fee_source_mode == 'ctp' and ctp_connected and not fee_counts.get('ctp') %}
<br><strong class="text-loss">当前无 CTP 费率缓存,请点击「从 CTP 同步费率」。</strong>
{% endif %}
</p>
</div>
{% endblock %}
+1 -1
View File
@@ -45,7 +45,7 @@
<div class="market-chart-empty" id="market-chart-empty">请选择合约并点击「查看」</div>
<div class="market-chart-loading" id="market-chart-loading">连接中…</div>
</div>
<p class="hint">数据来源:{% if ctp_connected %}CTP 柜台 tick 聚合(实时价与 K 线){% else %}CTP 未连接时 K 线与报价回退新浪{% endif %}。拖拽左右平移、滚轮缩放;按住图表上下拖动可平移价格轴。可视区内自动标注最高/最低价。</p>
<p class="hint">数据来源:{% if ctp_connected %}报价来自 CTP;K 线历史由新浪补齐、最新 bar 由 CTP tick 更新{% else %}CTP 未连接时 K 线与报价回退新浪{% endif %}。拖拽左右平移、滚轮缩放;按住图表上下拖动可平移价格轴。可视区内自动标注最高/最低价。</p>
</div>
<style>
+36 -14
View File
@@ -88,7 +88,7 @@ class CtpBridge:
self._last_error: str = ""
self._connect_lock = threading.Lock()
self._commission_waiters: dict[int, threading.Event] = {}
self._commission_results: dict[int, dict] = {}
self._commission_lists: dict[int, list] = {}
self._commission_hooked = False
self._subscribed: set[str] = set()
self._tick_hooked = False
@@ -251,7 +251,7 @@ class CtpBridge:
error.get("ErrorMsg") or error,
)
if data and data.get("InstrumentID"):
bridge._commission_results[reqid] = dict(data)
bridge._commission_lists.setdefault(reqid, []).append(dict(data))
ev = bridge._commission_waiters.get(reqid)
if last and ev:
ev.set()
@@ -259,21 +259,26 @@ class CtpBridge:
td.onRspQryInstrumentCommissionRate = on_rsp # type: ignore[method-assign]
self._commission_hooked = True
def query_instrument_commission(self, ths_code: str, *, mode: str) -> dict:
"""查询单合约 CTP 手续费率(需已连接)。"""
def _query_commission(
self,
*,
mode: str,
instrument_id: str = "",
exchange_id: str = "",
timeout: float = 8,
) -> list[dict]:
if self._connected_mode != mode or not self._engine:
return {}
return []
try:
sym, ex_name = ths_to_vnpy_symbol(ths_code)
gw = self._engine.get_gateway(GATEWAY_NAME)
td = gw.td_api
except Exception as exc:
logger.debug("commission query init: %s", exc)
return {}
return []
if not getattr(td, "login_status", False):
return {}
return []
if not hasattr(td, "reqQryInstrumentCommissionRate"):
return {}
return []
self._ensure_commission_callback()
reqid = int(getattr(td, "reqid", 0)) + 1
td.reqid = reqid
@@ -282,16 +287,33 @@ class CtpBridge:
req = {
"BrokerID": td.brokerid,
"InvestorID": td.userid,
"InstrumentID": sym,
"ExchangeID": ex_name,
"InstrumentID": instrument_id or "",
"ExchangeID": exchange_id or "",
}
ret = td.reqQryInstrumentCommissionRate(req, reqid)
if ret != 0:
self._commission_waiters.pop(reqid, None)
return {}
ev.wait(timeout=8)
return []
ev.wait(timeout=timeout)
self._commission_waiters.pop(reqid, None)
return self._commission_results.pop(reqid, {})
return self._commission_lists.pop(reqid, [])
def query_instrument_commission(self, ths_code: str, *, mode: str) -> dict:
"""查询单合约 CTP 手续费率(需已连接)。"""
try:
sym, ex_name = ths_to_vnpy_symbol(ths_code)
except Exception:
return {}
rows = self._query_commission(
mode=mode,
instrument_id=sym,
exchange_id=ex_name,
)
return rows[-1] if rows else {}
def query_all_commissions(self, *, mode: str) -> list[dict]:
"""批量查询全部合约手续费(InstrumentID 留空)。"""
return self._query_commission(mode=mode, timeout=45)
def _tick_key(self, symbol: str, ex_name: str) -> str:
return f"{symbol.lower()}:{ex_name.upper()}"