diff --git a/app.py b/app.py index d531040..d77e4d8 100644 --- a/app.py +++ b/app.py @@ -1223,6 +1223,22 @@ def records(): start, end = parse_review_date_filter(preset, start, end) conn = get_db() + ctp_sync_info = None + try: + from ctp_trade_sync import sync_trade_logs_from_ctp + from trading_context import get_account_capital, get_trading_mode + from vnpy_bridge import ctp_status + + mode = get_trading_mode(get_setting) + if ctp_status(mode).get("connected"): + capital = get_account_capital(conn, get_setting) + ctp_sync_info = sync_trade_logs_from_ctp( + conn, mode, capital=capital, trading_mode=mode, + ) + conn.commit() + except Exception as exc: + app.logger.warning("ctp trade sync on records page: %s", exc) + sql = "SELECT * FROM review_records WHERE 1=1" params: list = [] if start: @@ -1264,6 +1280,7 @@ def records(): trades=trades, equity_curve=equity_curve, auto_records=auto_list, + ctp_sync_info=ctp_sync_info, preset=preset, start=start, end=end, diff --git a/ctp_trade_sync.py b/ctp_trade_sync.py new file mode 100644 index 0000000..1641e12 --- /dev/null +++ b/ctp_trade_sync.py @@ -0,0 +1,262 @@ +"""从 CTP 柜台同步成交,写入 trade_logs(以交易所成交为准)。""" +from __future__ import annotations + +import logging +from collections import defaultdict +from datetime import datetime +from typing import Any, Callable, Optional +from zoneinfo import ZoneInfo + +from contract_specs import calc_position_metrics +from ctp_symbol import ths_to_vnpy_symbol +from fee_specs import calc_round_trip_fee +from symbols import ths_to_codes +from trade_log_lib import calc_equity_after, ensure_trade_log_columns +from vnpy_bridge import ctp_list_trades, ctp_status + +logger = logging.getLogger(__name__) +TZ = ZoneInfo("Asia/Shanghai") + + +def _match_symbol(ctp_sym: str, ths: str) -> bool: + a = (ctp_sym or "").lower() + b = (ths or "").lower() + if a == b: + return True + if a and b and a.split(".")[0] == b.split(".")[0]: + return True + try: + vnpy_sym, _ = ths_to_vnpy_symbol(ths) + if a == vnpy_sym.lower(): + return True + except Exception: + pass + return False + + +def _to_ths_code(symbol: str) -> str: + sym = (symbol or "").strip() + if not sym: + return "" + codes = ths_to_codes(sym) + if codes: + return codes.get("ths_code") or sym + return sym.lower() + + +def build_round_trips(trades: list[dict[str, Any]]) -> list[dict[str, Any]]: + """按 FIFO 将开/平仓成交配对为完整回合。""" + stacks: dict[tuple[str, str], list[dict[str, Any]]] = defaultdict(list) + trips: list[dict[str, Any]] = [] + + ordered = sorted( + trades, + key=lambda t: ((t.get("datetime") or ""), str(t.get("trade_id") or "")), + ) + for t in ordered: + sym = (t.get("symbol") or "").lower() + pos_dir = (t.get("position_direction") or "long").strip().lower() + offset = (t.get("offset") or "open").strip().lower() + lots = int(t.get("lots") or 0) + if not sym or lots <= 0: + continue + key = (sym, pos_dir) + if offset == "open": + stacks[key].append({ + **t, + "remaining": lots, + }) + continue + + close_lots_left = lots + close_price = float(t.get("price") or 0) + close_time = t.get("datetime") or "" + close_trade_id = str(t.get("trade_id") or "") + while close_lots_left > 0 and stacks[key]: + open_t = stacks[key][0] + matched = min(close_lots_left, int(open_t.get("remaining") or 0)) + if matched <= 0: + stacks[key].pop(0) + continue + open_t["remaining"] = int(open_t.get("remaining") or 0) - matched + if open_t["remaining"] <= 0: + stacks[key].pop(0) + close_lots_left -= matched + open_trade_id = str(open_t.get("trade_id") or "") + ctp_key = f"{open_trade_id}|{close_trade_id}|{sym}|{pos_dir}|{matched}" + trips.append({ + "ctp_trade_key": ctp_key, + "symbol": sym, + "ths_code": _to_ths_code(sym), + "direction": pos_dir, + "lots": matched, + "entry_price": float(open_t.get("price") or 0), + "close_price": close_price, + "open_time": open_t.get("datetime") or "", + "close_time": close_time, + "open_trade_id": open_trade_id, + "close_trade_id": close_trade_id, + }) + return trips + + +def _find_monitor_meta( + conn, + *, + symbol: str, + direction: str, + open_time: str, + match_symbol_fn: Callable[[str, str], bool] | None = None, +) -> dict[str, Any]: + match = match_symbol_fn or _match_symbol + direction = (direction or "long").strip().lower() + best: Optional[dict[str, Any]] = None + for r in conn.execute( + "SELECT * FROM trade_order_monitors ORDER BY id DESC LIMIT 200" + ).fetchall(): + row = dict(r) + if (row.get("direction") or "long").strip().lower() != direction: + continue + if not match(symbol, row.get("symbol") or ""): + continue + if best is None: + best = row + continue + ot = (row.get("open_time") or "").strip() + if open_time and ot and abs(len(ot) - len(open_time)) <= 2 and ot[:16] == open_time[:16]: + return row + return best or {} + + +def _holding_minutes(open_time: str, close_time: str) -> int: + try: + from app import holding_to_minutes + return int(holding_to_minutes(open_time, close_time) or 0) + except Exception: + return 0 + + +def sync_trade_logs_from_ctp( + conn, + mode: str, + *, + capital: float = 0.0, + trading_mode: str = "simulation", +) -> dict[str, Any]: + """查询 CTP 成交并 upsert 到 trade_logs。返回同步摘要。""" + stats = {"synced": 0, "updated": 0, "skipped": 0, "connected": False} + if not ctp_status(mode).get("connected"): + return stats + stats["connected"] = True + ensure_trade_log_columns(conn) + try: + conn.execute("ALTER TABLE trade_logs ADD COLUMN source TEXT DEFAULT 'local'") + except Exception: + pass + try: + conn.execute("ALTER TABLE trade_logs ADD COLUMN ctp_trade_key TEXT") + except Exception: + pass + + trades = ctp_list_trades(mode, refresh=True) + trips = build_round_trips(trades) + for trip in trips: + key = trip.get("ctp_trade_key") or "" + if not key: + stats["skipped"] += 1 + continue + existing = conn.execute( + "SELECT id FROM trade_logs WHERE ctp_trade_key=?", + (key,), + ).fetchone() + + ths = trip.get("ths_code") or trip.get("symbol") or "" + codes = ths_to_codes(ths) or {} + direction = trip.get("direction") or "long" + entry = float(trip.get("entry_price") or 0) + close_px = float(trip.get("close_price") or 0) + lots = float(trip.get("lots") or 0) + open_time = trip.get("open_time") or "" + close_time = trip.get("close_time") or datetime.now(TZ).strftime("%Y-%m-%dT%H:%M") + + mon = _find_monitor_meta( + conn, + symbol=trip.get("symbol") or ths, + direction=direction, + open_time=open_time, + ) + sl = mon.get("stop_loss") + tp = mon.get("take_profit") + try: + sl_f = float(sl) if sl is not None else entry + tp_f = float(tp) if tp is not None else entry + except (TypeError, ValueError): + sl_f, tp_f = entry, entry + + metrics = calc_position_metrics( + direction, entry, sl_f, tp_f, lots, close_px, capital, ths, + ) + pnl = float(metrics.get("float_pnl") or 0) + fee = calc_round_trip_fee( + ths, entry, close_px, lots, open_time, close_time, trading_mode=trading_mode, + ) + pnl_net = round(pnl - fee, 2) + margin_pct = metrics.get("position_pct") + equity_after = calc_equity_after(capital, pnl_net) + minutes = _holding_minutes(open_time, close_time) + result = "CTP同步" + monitor_type = mon.get("monitor_type") or "CTP同步" + + row_vals = ( + ths, + codes.get("name") or mon.get("symbol_name") or ths, + codes.get("market_code") or mon.get("market_code") or "", + codes.get("sina_code") or mon.get("sina_code") or "", + monitor_type, + direction, + entry, + sl if sl is not None else None, + tp if tp is not None else None, + close_px, + lots, + metrics.get("margin"), + margin_pct, + minutes, + open_time, + close_time, + pnl, + fee, + pnl_net, + equity_after, + result, + ) + if existing: + conn.execute( + """UPDATE trade_logs SET + symbol=?, symbol_name=?, market_code=?, sina_code=?, monitor_type=?, + direction=?, entry_price=?, stop_loss=?, take_profit=?, close_price=?, + lots=?, margin=?, margin_pct=?, holding_minutes=?, open_time=?, close_time=?, + pnl=?, fee=?, pnl_net=?, equity_after=?, result=?, source='ctp', verified=1 + WHERE ctp_trade_key=?""", + row_vals + (key,), + ) + stats["updated"] += 1 + else: + conn.execute( + """INSERT INTO trade_logs + (symbol, symbol_name, market_code, sina_code, monitor_type, direction, + entry_price, stop_loss, take_profit, close_price, lots, margin, + margin_pct, holding_minutes, open_time, close_time, pnl, fee, pnl_net, + equity_after, result, source, ctp_trade_key, verified) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", + row_vals + ("ctp", key, 1), + ) + stats["synced"] += 1 + + if stats["synced"] or stats["updated"]: + try: + from stats_engine import refresh_stats_cache + refresh_stats_cache(conn, capital) + except Exception as exc: + logger.debug("stats refresh after ctp trade sync: %s", exc) + return stats diff --git a/install_trading.py b/install_trading.py index 5b661d1..dbe50f8 100644 --- a/install_trading.py +++ b/install_trading.py @@ -80,6 +80,7 @@ from trading_context import ( ) from ctp_symbol import ths_to_vnpy_symbol from vnpy_bridge import ( + ctp_cancel_order, ctp_connect, ctp_get_account, ctp_get_tick_price, @@ -354,6 +355,25 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se "label": "止盈监控", "price": float(tp), }) + for r in conn.execute( + "SELECT * FROM trade_order_monitors WHERE status='pending' ORDER BY id DESC" + ).fetchall(): + mon = dict(r) + sym = mon.get("symbol") or "" + pending.append({ + "symbol_code": sym, + "symbol": mon.get("symbol_name") or sym, + "direction": mon.get("direction") or "long", + "direction_label": "做多" if (mon.get("direction") or "long") == "long" else "做空", + "lots": int(mon.get("lots") or 0), + "price": float(mon.get("order_price") or mon.get("entry_price") or 0), + "order_kind": "open_pending", + "label": "开仓挂单中", + "source": "monitor", + "monitor_id": mon.get("id"), + "can_cancel_order": is_trading_session(), + "cancel_allowed": is_trading_session(), + }) ctp_st = ctp_status(mode) if ctp_st.get("connected"): for o in _ctp_active_orders(mode): @@ -374,6 +394,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se "label": label, "source": "ctp", "order_id": o.get("order_id"), + "can_cancel_order": is_trading_session(), + "cancel_allowed": is_trading_session(), }) return pending @@ -833,7 +855,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se "est_fee": None, "can_close": False, "close_allowed": False, - "can_cancel_order": True, + "can_cancel_order": is_trading_session(), + "cancel_allowed": is_trading_session(), "auto_cancel_sec": remain, "pending_timeout_sec": timeout_sec, "pending_timeout_min": max(1, timeout_sec // 60), @@ -1285,6 +1308,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se return jsonify({"ok": False, "error": "记录不存在或已关闭"}), 404 mon = dict(row) if (mon.get("status") or "").strip().lower() == "pending": + if not is_trading_session(): + return jsonify({"ok": False, "error": "不在交易时间段,无法撤单"}), 403 ok, msg = cancel_pending_monitor(conn, mon, mode) _push_position_snapshot_async(fast=False) return jsonify({"ok": ok, "message": msg}) @@ -1315,6 +1340,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se mode = get_trading_mode(get_setting) if not ctp_status(mode).get("connected"): return jsonify({"ok": False, "error": "请先连接 CTP"}), 400 + if not is_trading_session(): + return jsonify({"ok": False, "error": "不在交易时间段,无法撤单"}), 403 row = conn.execute( "SELECT * FROM trade_order_monitors WHERE id=? AND status='pending'", (monitor_id,), @@ -1327,6 +1354,25 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se finally: conn.close() + @app.route("/api/trading/order/cancel", methods=["POST"]) + @login_required + def api_trading_order_cancel(): + """撤销柜台未成交委托(按 vt_order_id)。""" + d = request.get_json(silent=True) or {} + order_id = (d.get("order_id") or "").strip() + if not order_id: + return jsonify({"ok": False, "error": "无效的委托号"}), 400 + mode = get_trading_mode(get_setting) + if not ctp_status(mode).get("connected"): + return jsonify({"ok": False, "error": "请先连接 CTP"}), 400 + if not is_trading_session(): + return jsonify({"ok": False, "error": "不在交易时间段,无法撤单"}), 403 + ok = ctp_cancel_order(mode, order_id) + _push_position_snapshot_async(fast=False) + if not ok: + return jsonify({"ok": False, "error": "撤单失败,委托可能已成交或已撤销"}), 400 + return jsonify({"ok": True, "message": "撤单已提交"}) + @app.route("/api/trading/close", methods=["POST"]) @login_required def api_trading_close(): @@ -1409,9 +1455,15 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se (mid,), ) conn.commit() + try: + from ctp_trade_sync import sync_trade_logs_from_ctp + sync_trade_logs_from_ctp(conn, mode, capital=capital, trading_mode=mode) + conn.commit() + except Exception as exc: + logger.debug("sync trades after close: %s", exc) conn.close() _push_position_snapshot_async() - return jsonify({"ok": True, "message": "已平仓并记入交易记录(手动平仓)"}) + return jsonify({"ok": True, "message": "已平仓;交易记录将按柜台成交同步"}) except ValueError as exc: conn.close() return jsonify({"ok": False, "error": str(exc)}), 400 diff --git a/order_pending.py b/order_pending.py index 175df38..a2f0757 100644 --- a/order_pending.py +++ b/order_pending.py @@ -7,6 +7,7 @@ from datetime import datetime from typing import Any, Callable, Optional from zoneinfo import ZoneInfo +from market_sessions import is_trading_session from vnpy_bridge import ctp_cancel_order, ctp_list_active_orders, ctp_status logger = logging.getLogger(__name__) @@ -132,7 +133,7 @@ def reconcile_pending_orders( continue if vt_oid and vt_oid in active_orders: - if age >= limit_sec: + if age >= limit_sec and is_trading_session(): if ctp_cancel_order(mode, vt_oid): conn.execute( "UPDATE trade_order_monitors SET status='closed' WHERE id=?", diff --git a/static/css/trade.css b/static/css/trade.css index f999071..5e813af 100644 --- a/static/css/trade.css +++ b/static/css/trade.css @@ -77,6 +77,7 @@ .pos-card.is-pending .pos-metrics .cell.pnl-pending label{color:var(--accent)} .pos-close-btn{padding:.4rem .85rem;font-size:.78rem;border-radius:8px;border:1px solid var(--loss);background:var(--loss-bg);color:var(--loss);cursor:pointer;white-space:nowrap;width:auto;flex-shrink:0;min-height:36px} .pos-close-btn:disabled,.pos-close-btn.is-session-off{opacity:.45;cursor:not-allowed;border-color:var(--text-muted);background:var(--card-inner);color:var(--text-muted)} +.pos-dismiss-btn:disabled,.pos-dismiss-btn.is-session-off{opacity:.45;cursor:not-allowed;color:var(--text-muted)} .pos-card-meta-line{font-size:.78rem;line-height:1.65;color:var(--text-muted);margin-bottom:.55rem} .pos-card-meta-line strong{color:var(--text)} .pos-card-actions{display:flex;gap:.35rem;flex-shrink:0;align-items:center} diff --git a/static/js/trade.js b/static/js/trade.js index f07e98b..3f4ef8f 100644 --- a/static/js/trade.js +++ b/static/js/trade.js @@ -204,17 +204,29 @@ if (pendingOnly.length) { list.innerHTML = '
暂无持仓
' + pendingOnly.map(function (p) { - var dismissBtn = p.monitor_id ? - '' : ''; + var cancelAllowed = p.cancel_allowed !== false && isTradingSession; + var actionBtn = ''; + if (p.monitor_id) { + actionBtn = ''; + } else if (p.order_id && p.source === 'ctp') { + actionBtn = ''; + } return ( '
' + (p.label || '挂单') + ' · ' + (p.symbol || p.symbol_code) + '' + '' + fmtNum(p.price) + ' · ' + - (p.lots || 1) + ' 手' + dismissBtn + '
' + (p.lots || 1) + ' 手' + actionBtn + '' ); }).join(''); bindPendingDismiss(list); + bindCancelOrderButtons(list); } else { list.innerHTML = '
暂无持仓。
'; } @@ -674,6 +686,10 @@ opts = opts || {}; if (!monitorId) return; var isPending = !!opts.pending; + if (isPending && !isTradingSession) { + alert('不在交易时间段,无法撤单'); + return; + } var confirmMsg = isPending ? '撤销该开仓委托?(将向柜台发送撤单)' : '取消该本地止盈止损监控?(不影响柜台委托)'; @@ -706,6 +722,10 @@ if (!root) return; root.querySelectorAll('[data-cancel-open]').forEach(function (btn) { btn.addEventListener('click', function () { + if (!isTradingSession) { + alert('不在交易时间段,无法撤单'); + return; + } dismissMonitor(parseInt(btn.getAttribute('data-cancel-open'), 10), btn, { pending: true }); }); }); @@ -715,7 +735,41 @@ if (!root) return; root.querySelectorAll('[data-monitor-id]').forEach(function (btn) { btn.addEventListener('click', function () { - dismissMonitor(parseInt(btn.getAttribute('data-monitor-id'), 10), btn); + var isPendingCancel = btn.getAttribute('data-pending-cancel') === '1'; + dismissMonitor( + parseInt(btn.getAttribute('data-monitor-id'), 10), + btn, + isPendingCancel ? { pending: true } : {} + ); + }); + }); + } + + function bindCancelOrderButtons(root) { + if (!root) return; + root.querySelectorAll('[data-cancel-order]').forEach(function (btn) { + btn.addEventListener('click', function () { + if (!isTradingSession) { + alert('不在交易时间段,无法撤单'); + return; + } + var orderId = decodeURIComponent(btn.getAttribute('data-cancel-order') || ''); + if (!orderId) return; + if (!confirm('撤销该柜台委托?')) return; + btn.disabled = true; + btn.textContent = '撤单中…'; + fetch('/api/trading/order/cancel', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ order_id: orderId }) + }).then(function (r) { return r.json(); }).then(function (d) { + if (!d.ok) throw new Error(d.error || d.message || '撤单失败'); + pollPositions(); + }).catch(function (e) { + alert(e.message || '撤单失败'); + btn.disabled = false; + btn.textContent = '撤单'; + }); }); }); } @@ -751,8 +805,11 @@ var remainMin = row.pending_timeout_min != null ? row.pending_timeout_min : (row.auto_cancel_sec != null ? Math.max(1, Math.ceil(row.auto_cancel_sec / 60)) : 5); + var cancelAllowed = row.cancel_allowed !== false && isTradingSession; var cancelBtn = row.can_cancel_order ? - '' : ''; + '' : ''; var metaLine = '状态 挂单中' + ' · 委托价 ' + fmtNum(orderPx) + '' + diff --git a/templates/records.html b/templates/records.html index 011d1e8..929792e 100644 --- a/templates/records.html +++ b/templates/records.html @@ -11,6 +11,14 @@

交易记录

+ {% if ctp_sync_info and ctp_sync_info.connected %} +

+ 已连接 CTP,本页已自动同步柜台成交(新增 {{ ctp_sync_info.synced or 0 }} 条,更新 {{ ctp_sync_info.updated or 0 }} 条)。 + 带来源「柜台」的记录以交易所成交为准;「本地」为程序写入,可手动删除错误项。 +

+ {% else %} +

CTP 未连接时仅显示本地数据库记录;连接后打开本页会自动同步柜台成交。

+ {% endif %}