From 040436e9ccee43138eb234bafadbf8c46aad138a Mon Sep 17 00:00:00 2001 From: dekun Date: Thu, 25 Jun 2026 14:06:19 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=8C=81=E4=BB=93=E7=9B=91=E6=8E=A7?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=BA=93=E4=BC=98=E5=85=88=E6=98=BE=E7=A4=BA?= =?UTF-8?q?=EF=BC=8C=E4=BF=AE=E5=A4=8D=E5=BC=80=E4=BB=93=E9=87=8D=E5=A4=8D?= =?UTF-8?q?=E4=B8=8E=E5=90=8C=E6=AD=A5=E5=89=8D=E7=A9=BA=E7=99=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Cursor --- install_trading.py | 513 +++++++++++++++++++++++++++++-------------- sl_tp_guard.py | 21 +- static/js/trade.js | 35 ++- templates/trade.html | 4 +- 4 files changed, 390 insertions(+), 183 deletions(-) diff --git a/install_trading.py b/install_trading.py index 49aca31..121f5a7 100644 --- a/install_trading.py +++ b/install_trading.py @@ -248,142 +248,343 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se except Exception: return [] + def _canonical_position_key(symbol: str, direction: str) -> str: + sym = (symbol or "").strip() + d = (direction or "long").strip().lower() + try: + vnpy_sym, _ = ths_to_vnpy_symbol(sym) + return f"{vnpy_sym.lower()}:{d}" + except Exception: + return f"{sym.lower()}:{d}" + + def _find_active_monitor(conn, symbol: str, direction: str) -> Optional[dict]: + direction = (direction or "long").strip().lower() + for r in conn.execute( + "SELECT * FROM trade_order_monitors WHERE status='active' ORDER BY id DESC" + ).fetchall(): + row = dict(r) + if (row.get("direction") or "long") != direction: + continue + if _match_ctp_symbol(symbol, row.get("symbol") or ""): + return row + return None + + def _close_duplicate_monitors(conn, symbol: str, direction: str, keep_id: int) -> None: + direction = (direction or "long").strip().lower() + for r in conn.execute( + "SELECT id, symbol, direction FROM trade_order_monitors WHERE status='active'" + ).fetchall(): + if int(r["id"]) == int(keep_id): + continue + if (r["direction"] or "long") != direction: + continue + if _match_ctp_symbol(symbol, r["symbol"] or ""): + conn.execute( + "UPDATE trade_order_monitors SET status='closed' WHERE id=?", + (r["id"],), + ) + + def _upsert_open_monitor( + conn, + *, + sym: str, + direction: str, + lots: int, + price: float, + sl, + tp, + trailing_be: int, + ) -> int: + ensure_monitor_order_columns(conn) + codes = ths_to_codes(sym) or {} + sl_f = float(sl) if sl not in (None, "") else None + tp_f = float(tp) if tp not in (None, "") else None + now_s = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + existing = _find_active_monitor(conn, sym, direction) + if existing: + mid = int(existing["id"]) + initial_sl = existing.get("initial_stop_loss") + if sl_f is not None and initial_sl is None: + initial_sl = sl_f + conn.execute( + """UPDATE trade_order_monitors SET + symbol=?, symbol_name=?, market_code=?, lots=?, entry_price=?, + stop_loss=?, take_profit=?, initial_stop_loss=?, trailing_be=?, open_time=? + WHERE id=?""", + ( + sym, + codes.get("name", sym), + codes.get("market_code", ""), + lots, + price, + sl_f, + tp_f, + initial_sl, + trailing_be, + now_s, + mid, + ), + ) + else: + conn.execute( + """INSERT INTO trade_order_monitors ( + symbol, symbol_name, market_code, direction, lots, entry_price, + stop_loss, take_profit, initial_stop_loss, trailing_be, + open_time, monitor_type, status + ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?, 'active')""", + ( + sym, + codes.get("name", sym), + codes.get("market_code", ""), + direction, + lots, + price, + sl_f, + tp_f, + sl_f, + trailing_be, + now_s, + "manual", + ), + ) + mid = int(conn.execute("SELECT last_insert_rowid()").fetchone()[0]) + _close_duplicate_monitors(conn, sym, direction, mid) + return mid + + def _sync_monitor_lots_from_ctp(conn, mid: int, sym: str, direction: str, mode: str) -> None: + for p in _ctp_positions(mode): + if int(p.get("lots") or 0) <= 0: + continue + if (p.get("direction") or "long") != direction: + continue + if not _match_ctp_symbol(p.get("symbol") or "", sym): + continue + conn.execute( + "UPDATE trade_order_monitors SET lots=?, entry_price=? WHERE id=?", + (int(p.get("lots") or 0), float(p.get("avg_price") or 0), mid), + ) + return + + def _compose_position_row( + conn, + *, + mon: Optional[dict], + ctp: Optional[dict], + mode: str, + capital: float, + now_iso: str, + ) -> Optional[dict]: + if not mon and not ctp: + return None + if ctp: + sym = (ctp.get("symbol") or "").strip() + direction = ctp.get("direction") or "long" + lots = int(ctp.get("lots") or 0) + if lots <= 0: + return None + entry = float(ctp.get("avg_price") or 0) + float_pnl = ctp.get("pnl") + if float_pnl is not None: + float_pnl = round(float(float_pnl), 2) + source_label = "CTP 柜台" + else: + sym = (mon.get("symbol") or "").strip() + direction = mon.get("direction") or "long" + lots = int(mon.get("lots") or 0) + if lots <= 0: + return None + entry = float(mon.get("entry_price") or 0) + float_pnl = None + source_label = "本地监控" + + codes = ths_to_codes(sym) + tick = calc_order_tick_metrics(sym, lots, entry) + sl = float(mon["stop_loss"]) if mon and mon.get("stop_loss") is not None else None + tp = float(mon["take_profit"]) if mon and mon.get("take_profit") is not None else None + open_time = (mon.get("open_time") or "") if mon else "" + holding = _holding_duration(open_time, now_iso) if open_time else "" + + mark = None + if ctp_status(mode).get("connected"): + mark = ctp_get_tick_price(mode, sym) + if (mark is None or mark <= 0) and codes: + mark = fetch_price( + sym, + codes.get("market_code", ""), + codes.get("sina_code", ""), + ) + close_est = float(mark) if mark and mark > 0 else entry + if float_pnl is None and mark and entry: + pos_tmp = calc_position_metrics( + direction, entry, sl or entry, tp or entry, lots, mark, capital, sym, + ) + float_pnl = pos_tmp.get("float_pnl") + + fee_info = calc_fee_breakdown( + sym, entry, close_est, lots, open_time or now_iso, now_iso, trading_mode=mode, + ) + est_net = None + if float_pnl is not None: + est_net = round(float(float_pnl) - fee_info["total_fee"], 2) + pos_metrics = calc_position_metrics( + direction, entry, sl if sl is not None else entry, + tp if tp is not None else entry, lots, mark, capital, sym, + ) + order_st = monitor_order_status( + mon or {}, mode=mode, ths_code=sym, direction=direction, + ) + pending_for_row: list[dict] = [] + if sl is not None: + pending_for_row.append({ + "order_kind": "stop_loss", + "label": "止损监控", + "price": sl, + "lots": lots, + "source": "monitor", + "monitor_id": mon["id"] if mon else None, + }) + if tp is not None: + pending_for_row.append({ + "order_kind": "take_profit", + "label": "止盈监控", + "price": tp, + "lots": lots, + "source": "monitor", + "monitor_id": mon["id"] if mon else None, + }) + row_key = _canonical_position_key(sym, direction) + return { + "key": row_key, + "source": "ctp" if ctp else "local", + "source_label": source_label, + "sync_pending": ctp is None and mon is not None, + "monitor_id": mon["id"] if mon else None, + "symbol": codes.get("name", sym) if codes else (mon.get("symbol_name") if mon else sym), + "symbol_code": sym, + "direction": direction, + "direction_label": "做多" if direction == "long" else "做空", + "lots": lots, + "entry_price": entry, + "stop_loss": sl, + "take_profit": tp, + "open_time": open_time or None, + "holding_duration": holding or None, + "mark_price": mark, + "current_price": mark, + "margin": pos_metrics.get("margin"), + "position_pct": pos_metrics.get("position_pct"), + "float_pnl": float_pnl, + "est_fee": fee_info["total_fee"], + "est_fee_open": fee_info["open_fee"], + "est_fee_close": fee_info["close_fee"], + "est_fee_close_type": fee_info["close_type"], + "est_pnl_net": est_net, + "sl_order_active": order_st.get("sl_monitoring"), + "tp_order_active": order_st.get("tp_monitoring"), + "sl_monitoring": order_st.get("sl_monitoring"), + "tp_monitoring": order_st.get("tp_monitoring"), + "can_place_orders": False, + "tick_value_total": tick.get("tick_value_total"), + "price_precision": tick.get("price_precision"), + "tick_size": tick.get("tick_size"), + "can_close": True, + "pending_orders": pending_for_row, + "trailing_be": bool(mon.get("trailing_be")) if mon else False, + "trailing_r_locked": int(mon.get("trailing_r_locked") or 0) if mon else 0, + } + def _build_trading_live_rows(conn) -> list[dict]: from zoneinfo import ZoneInfo tz = ZoneInfo("Asia/Shanghai") now_iso = datetime.now(tz).strftime("%Y-%m-%dT%H:%M") mode = get_trading_mode(get_setting) - ctp_st = ctp_status(mode) - rows: list[dict] = [] capital = _capital(conn) - - if not ctp_st.get("connected"): - return rows - ensure_monitor_order_columns(conn) - # 程序监控仅用于补充止损/止盈,持仓以 CTP 柜台为准 - monitor_map: dict[tuple[str, str], dict] = {} - for r in conn.execute( - "SELECT * FROM trade_order_monitors WHERE status='active'" - ).fetchall(): - key = (r["symbol"].lower(), r["direction"]) - monitor_map[key] = dict(r) + monitors_raw = [ + dict(r) for r in conn.execute( + "SELECT * FROM trade_order_monitors WHERE status='active' ORDER BY id DESC" + ).fetchall() + ] + monitor_by_key: dict[str, dict] = {} + for mon in monitors_raw: + key = _canonical_position_key(mon.get("symbol") or "", mon.get("direction") or "long") + if key not in monitor_by_key: + monitor_by_key[key] = mon - for p in _ctp_positions(mode): - sym = (p.get("symbol") or "").strip() - direction = p.get("direction") or "long" - lots = int(p.get("lots") or 0) - if lots <= 0: + ctp_list: list[dict] = _ctp_positions(mode) if ctp_status(mode).get("connected") else [] + ctp_by_key: dict[str, dict] = {} + for p in ctp_list: + if int(p.get("lots") or 0) <= 0: continue - entry = float(p.get("avg_price") or 0) - float_pnl = p.get("pnl") - if float_pnl is not None: - float_pnl = round(float(float_pnl), 2) - codes = ths_to_codes(sym) - tick = calc_order_tick_metrics(sym, lots, entry) - mon = None - for (ms, md), mv in monitor_map.items(): - if md != direction: - continue - if ms == sym.lower() or _match_ctp_symbol(sym, ms): - mon = mv - break - sl = float(mon["stop_loss"]) if mon and mon.get("stop_loss") is not None else None - tp = float(mon["take_profit"]) if mon and mon.get("take_profit") is not None else None - open_time = (mon.get("open_time") or "") if mon else "" - holding = _holding_duration(open_time, now_iso) if open_time else "" - mark = ctp_get_tick_price(mode, sym) - if (mark is None or mark <= 0) and codes: - mark = fetch_price( - sym, - codes.get("market_code", ""), - codes.get("sina_code", ""), + key = _canonical_position_key(p.get("symbol") or "", p.get("direction") or "long") + ctp_by_key[key] = p + + rows: list[dict] = [] + used_ctp_keys: set[str] = set() + + for key, mon in monitor_by_key.items(): + ctp = ctp_by_key.get(key) + if not ctp: + for ck, cp in ctp_by_key.items(): + if ck in used_ctp_keys: + continue + if (cp.get("direction") or "long") != (mon.get("direction") or "long"): + continue + if _match_ctp_symbol(cp.get("symbol") or "", mon.get("symbol") or ""): + ctp = cp + used_ctp_keys.add(ck) + break + elif key in ctp_by_key: + used_ctp_keys.add(key) + if ctp and mon: + _sync_monitor_lots_from_ctp( + conn, int(mon["id"]), mon.get("symbol") or "", + mon.get("direction") or "long", mode, ) - close_est = float(mark) if mark and mark > 0 else entry - fee_info = calc_fee_breakdown( - sym, - entry, - close_est, - lots, - open_time or now_iso, - now_iso, - trading_mode=mode, + mon = _find_active_monitor(conn, mon.get("symbol") or "", mon.get("direction") or "long") or mon + row = _compose_position_row( + conn, mon=mon, ctp=ctp, mode=mode, capital=capital, now_iso=now_iso, ) - est_net = None - if float_pnl is not None: - est_net = round(float(float_pnl) - fee_info["total_fee"], 2) - pos_metrics = calc_position_metrics( - direction, - entry, - sl if sl is not None else entry, - tp if tp is not None else entry, - lots, - mark, - capital, - sym, + if row: + rows.append(row) + + for key, ctp in ctp_by_key.items(): + if key in used_ctp_keys: + continue + matched = False + for uk in used_ctp_keys: + if uk == key: + matched = True + break + if matched: + continue + for existing in rows: + if _match_ctp_symbol( + ctp.get("symbol") or "", existing.get("symbol_code") or "", + ) and (ctp.get("direction") or "long") == (existing.get("direction") or "long"): + matched = True + break + if matched: + continue + mon = _find_active_monitor( + conn, ctp.get("symbol") or "", ctp.get("direction") or "long", ) - order_st = monitor_order_status( - mon or {}, mode=mode, ths_code=sym, direction=direction, + row = _compose_position_row( + conn, mon=mon, ctp=ctp, mode=mode, capital=capital, now_iso=now_iso, ) - pending_for_row: list[dict] = [] - if sl is not None: - pending_for_row.append({ - "order_kind": "stop_loss", - "label": "止损监控", - "price": sl, - "lots": lots, - "source": "monitor", - "monitor_id": mon["id"] if mon else None, - }) - if tp is not None: - pending_for_row.append({ - "order_kind": "take_profit", - "label": "止盈监控", - "price": tp, - "lots": lots, - "source": "monitor", - "monitor_id": mon["id"] if mon else None, - }) - rows.append({ - "key": f"ctp:{sym.lower()}:{direction}", - "source": "ctp", - "source_label": "CTP 柜台", - "monitor_id": mon["id"] if mon else None, - "symbol": codes.get("name", sym) if codes else sym, - "symbol_code": sym, - "direction": direction, - "direction_label": "做多" if direction == "long" else "做空", - "lots": lots, - "entry_price": entry, - "stop_loss": sl, - "take_profit": tp, - "open_time": open_time or None, - "holding_duration": holding or None, - "mark_price": mark, - "current_price": mark, - "margin": pos_metrics.get("margin"), - "position_pct": pos_metrics.get("position_pct"), - "float_pnl": float_pnl, - "est_fee": fee_info["total_fee"], - "est_fee_open": fee_info["open_fee"], - "est_fee_close": fee_info["close_fee"], - "est_fee_close_type": fee_info["close_type"], - "est_pnl_net": est_net, - "sl_order_active": order_st.get("sl_monitoring"), - "tp_order_active": order_st.get("tp_monitoring"), - "sl_monitoring": order_st.get("sl_monitoring"), - "tp_monitoring": order_st.get("tp_monitoring"), - "can_place_orders": False, - "tick_value_total": tick.get("tick_value_total"), - "price_precision": tick.get("price_precision"), - "tick_size": tick.get("tick_size"), - "can_close": True, - "pending_orders": pending_for_row, - "trailing_be": bool(mon.get("trailing_be")) if mon else False, - "trailing_r_locked": int(mon.get("trailing_r_locked") or 0) if mon else 0, - }) - return rows + if row: + rows.append(row) + + seen: set[str] = set() + deduped: list[dict] = [] + for row in rows: + rk = row.get("key") or f"{row.get('symbol_code')}:{row.get('direction')}" + if rk in seen: + continue + seen.add(rk) + deduped.append(row) + return deduped def _build_trading_live_payload(conn) -> dict: mode = get_trading_mode(get_setting) @@ -1033,54 +1234,30 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se sl = d.get("stop_loss") tp = d.get("take_profit") trailing_be = 1 if d.get("trailing_be") else 0 + mid = _upsert_open_monitor( + conn, + sym=sym, + direction=direction, + lots=lots, + price=price, + sl=sl, + tp=tp, + trailing_be=trailing_be, + ) + conn.commit() + _push_position_snapshot_async() import time time.sleep(2.0) - actual_lots = lots - has_pos = False - for p in _ctp_positions(mode): - if int(p.get("lots") or 0) <= 0: - continue - if (p.get("direction") or "long") != direction: - continue - if _match_ctp_symbol(p.get("symbol") or "", sym): - has_pos = True - actual_lots = int(p.get("lots") or lots) - break - if has_pos: - codes = ths_to_codes(sym) - sl_f = float(sl) if sl else None - ensure_monitor_order_columns(conn) - conn.execute( - """INSERT INTO trade_order_monitors ( - symbol, symbol_name, market_code, direction, lots, entry_price, - stop_loss, take_profit, initial_stop_loss, trailing_be, - open_time, monitor_type, status - ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?, 'active')""", - ( - sym, - codes.get("name", sym) if codes else sym, - codes.get("market_code", "") if codes else "", - direction, - actual_lots, - price, - sl_f, - float(tp) if tp else None, - sl_f, - trailing_be, - datetime.now().strftime("%Y-%m-%d %H:%M:%S"), - "manual", - ), - ) - mid = conn.execute("SELECT last_insert_rowid()").fetchone()[0] - mon_row = conn.execute( - "SELECT * FROM trade_order_monitors WHERE id=?", (mid,), - ).fetchone() - if mon_row and (sl or tp): - try: - ensure_monitor_order_columns(conn) - cancel_monitor_exit_orders(conn, dict(mon_row), mode=mode) - except Exception as exc: - logger.warning("清理旧版止盈止损挂单失败: %s", exc) + _sync_monitor_lots_from_ctp(conn, mid, sym, direction, mode) + mon_row = conn.execute( + "SELECT * FROM trade_order_monitors WHERE id=?", (mid,), + ).fetchone() + if mon_row and (sl or tp): + try: + ensure_monitor_order_columns(conn) + cancel_monitor_exit_orders(conn, dict(mon_row), mode=mode) + except Exception as exc: + logger.warning("清理旧版止盈止损挂单失败: %s", exc) conn.commit() send_wechat_msg(f"{trading_mode_label(get_setting)} {offset} {sym} {direction} {lots}手 @{price}") conn.close() diff --git a/sl_tp_guard.py b/sl_tp_guard.py index 083718e..141e0b4 100644 --- a/sl_tp_guard.py +++ b/sl_tp_guard.py @@ -464,8 +464,8 @@ def cancel_monitor_exit_orders( return cancelled -def reconcile_monitors_without_position(conn, mode: str) -> int: - """持仓已平时:关闭监控并撤销残留止盈止损挂单。""" +def reconcile_monitors_without_position(conn, mode: str, *, grace_sec: int = 120) -> int: + """持仓已平时:关闭监控并撤销残留止盈止损挂单(新开仓 grace_sec 内不清理)。""" if not ctp_status(mode).get("connected"): return 0 positions = ctp_list_positions(mode) @@ -477,9 +477,26 @@ def reconcile_monitors_without_position(conn, mode: str) -> int: direction = p.get("direction") or "long" position_keys.add((sym, direction)) + now_ts = time.time() + + def _monitor_within_grace(mon: dict) -> bool: + raw = (mon.get("open_time") or mon.get("created_at") or "").strip() + if not raw: + return True + for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M"): + try: + dt = datetime.strptime(raw[:19], fmt) + if (now_ts - dt.timestamp()) <= grace_sec: + return True + except ValueError: + continue + return False + closed = 0 for r in conn.execute("SELECT * FROM trade_order_monitors WHERE status='active'").fetchall(): mon = dict(r) + if _monitor_within_grace(mon): + continue ms = mon.get("symbol") or "" md = mon.get("direction") or "long" matched = False diff --git a/static/js/trade.js b/static/js/trade.js index a73fce9..6c0b6a1 100644 --- a/static/js/trade.js +++ b/static/js/trade.js @@ -130,25 +130,32 @@ riskBadge.className = 'badge ' + (data.risk_status.can_trade ? 'profit' : 'loss'); } var rows = data.rows || []; + var seenKeys = {}; + rows = rows.filter(function (row) { + var k = row.key || ((row.symbol_code || '') + ':' + (row.direction || '')); + if (seenKeys[k]) return false; + seenKeys[k] = true; + return true; + }); hasSlTpMonitoring = rows.some(function (row) { return row.stop_loss != null || row.take_profit != null; }); updateSessionUi(); savePosCache(data); positionsRendered = true; - if (!connected) { - if (connecting) { - list.innerHTML = '
CTP 连接中,请稍候…
'; + if (!rows.length) { + if (!connected) { + if (connecting) { + list.innerHTML = '
CTP 连接中,请稍候…
'; + return; + } + list.innerHTML = '
CTP 未连接,正在尝试自动重连…
'; + tryAutoCtpReconnect(); return; } - list.innerHTML = '
CTP 未连接,正在尝试自动重连…
'; - tryAutoCtpReconnect(); - return; - } - if (!rows.length) { var pendingOnly = data.pending_orders || []; if (pendingOnly.length) { - list.innerHTML = '
柜台暂无持仓
' + + list.innerHTML = '
暂无持仓
' + pendingOnly.map(function (p) { var dismissBtn = p.monitor_id ? '' : ''; @@ -162,10 +169,13 @@ }).join(''); bindPendingDismiss(list); } else { - list.innerHTML = '
柜台暂无持仓。
'; + list.innerHTML = '
暂无持仓。
'; } return; } + if (!connected) { + tryAutoCtpReconnect(); + } list.innerHTML = rows.map(buildPosCard).join(''); bindPendingDismiss(list); bindSlTpButtons(list); @@ -556,7 +566,9 @@ '
' + row.symbol + ' ' + dirBadge + '
' + '
' + (row.symbol_code || '') + '
' + actionBtns + '
' + - '
来源 ' + (row.source_label || 'CTP') + ' · 柜台浮盈' + + '
来源 ' + (row.source_label || 'CTP') + '' + + (row.sync_pending ? ' · 同步柜台中…' : '') + + ' · 浮盈' + (slTpBtn ? ' · ' + slTpBtn : '') + (row.sl_order_active ? ' · 止损监控中' : '') + (row.tp_order_active ? ' · 止盈监控中' : '') + @@ -844,6 +856,7 @@ if (cached) { applyPositionsData(cached); } + pollPositions(); connectPositionStream(); connectRecommendStream(); fetch('/api/recommend/list') diff --git a/templates/trade.html b/templates/trade.html index 25e4481..233dca5 100644 --- a/templates/trade.html +++ b/templates/trade.html @@ -103,9 +103,9 @@

持仓监控

-

后台每秒拉取 CTP 并推送;刷新页面会使用浏览器缓存,不再阻塞读柜台。

+

开仓后立即写入本地监控并显示;后台每秒同步 CTP 柜台更新盈亏与手数。刷新页面优先读本地缓存。

-
{% if ctp_status.connected %}等待持仓推送…{% else %}请先连接 CTP 查看柜台持仓{% endif %}
+
加载本地持仓…