diff --git a/ctp_symbol.py b/ctp_symbol.py index e30cd83..4fd2a6c 100644 --- a/ctp_symbol.py +++ b/ctp_symbol.py @@ -32,8 +32,12 @@ def ths_to_vnpy_symbol(ths_code: str) -> Tuple[str, str]: """ code = (ths_code or "").strip() codes = ths_to_codes(code) - ex = (codes.get("ex") if codes else None) or "SHFE" - ex = _EX_MAP.get(ex, "SHFE") + ex = (codes.get("ex") if codes else None) + if not ex and codes: + mc = (codes.get("market_code") or "") + if "." in mc: + ex = mc.rsplit(".", 1)[-1] + ex = _EX_MAP.get(ex or "SHFE", "SHFE") m = re.match(r"^([A-Za-z]+)(\d+)$", code) if not m: return code, ex diff --git a/ctp_trade_sync.py b/ctp_trade_sync.py index 992a2b0..f8f5ea5 100644 --- a/ctp_trade_sync.py +++ b/ctp_trade_sync.py @@ -16,7 +16,7 @@ from contract_specs import calc_position_metrics from ctp_symbol import ths_to_vnpy_symbol from fee_specs import calc_round_trip_fee from symbols import ths_to_codes -from trade_log_lib import calc_equity_after, ensure_trade_log_columns +from trade_log_lib import calc_equity_after, purge_duplicate_local_trade_logs, ensure_trade_log_columns from vnpy_bridge import ctp_list_trades, ctp_status logger = logging.getLogger(__name__) @@ -292,4 +292,7 @@ def sync_trade_logs_from_ctp( refresh_stats_cache(conn, capital) except Exception as exc: logger.debug("stats refresh after ctp trade sync: %s", exc) + purged = purge_duplicate_local_trade_logs(conn) + if purged: + stats["purged"] = purged return stats diff --git a/install_trading.py b/install_trading.py index e3d8d7e..5890593 100644 --- a/install_trading.py +++ b/install_trading.py @@ -1639,23 +1639,25 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se lots=lots, price=price, settings=_settings_dict(), order_type="market", ) - write_manual_close_trade_log( - conn, - mon, - symbol=sym, - direction=direction, - lots=lots, - close_price=price, - entry_price=entry or price, - trading_mode=mode, - capital=capital, - stop_loss=float(mon["stop_loss"]) if mon and mon.get("stop_loss") is not None else None, - take_profit=float(mon["take_profit"]) if mon and mon.get("take_profit") is not None else None, - open_time=(mon.get("open_time") or "") if mon else "", - symbol_name=(mon.get("symbol_name") or "") if mon else "", - market_code=(mon.get("market_code") or "") if mon else "", - ) + if not ctp_status(mode).get("connected"): + write_manual_close_trade_log( + conn, + mon, + symbol=sym, + direction=direction, + lots=lots, + close_price=price, + entry_price=entry or price, + trading_mode=mode, + capital=capital, + stop_loss=float(mon["stop_loss"]) if mon and mon.get("stop_loss") is not None else None, + take_profit=float(mon["take_profit"]) if mon and mon.get("take_profit") is not None else None, + open_time=(mon.get("open_time") or "") if mon else "", + symbol_name=(mon.get("symbol_name") or "") if mon else "", + market_code=(mon.get("market_code") or "") if mon else "", + ) if mid: + _close_duplicate_monitors(conn, sym, direction, mid) conn.execute( "UPDATE trade_order_monitors SET status='closed' WHERE id=?", (mid,), diff --git a/sl_tp_guard.py b/sl_tp_guard.py index 391b1e2..721423f 100644 --- a/sl_tp_guard.py +++ b/sl_tp_guard.py @@ -39,6 +39,7 @@ PLACE_COOLDOWN_SEC = 3 _last_close_attempt: dict[int, float] = {} _closing_monitors: set[int] = set() +_closing_symbol_keys: set[str] = set() _closing_lock = threading.Lock() MONITOR_ORDER_COLUMNS = ( @@ -137,6 +138,69 @@ def _find_position(positions: list[dict], ths_code: str, direction: str) -> Opti return None +def _position_key(sym: str, direction: str) -> str: + return f"{(sym or '').strip().lower()}|{(direction or 'long').strip().lower()}" + + +def _try_acquire_close_symbol(sym: str, direction: str) -> bool: + key = _position_key(sym, direction) + with _closing_lock: + if key in _closing_symbol_keys: + return False + _closing_symbol_keys.add(key) + return True + + +def _release_close_symbol(sym: str, direction: str) -> None: + key = _position_key(sym, direction) + with _closing_lock: + _closing_symbol_keys.discard(key) + + +def _close_all_monitors_for_symbol(conn, sym: str, direction: str) -> None: + direction = (direction or "long").strip().lower() + for r in conn.execute( + "SELECT id, symbol, direction FROM trade_order_monitors WHERE status='active'" + ).fetchall(): + if (r["direction"] or "long") != direction: + continue + if _match_symbol(sym, r["symbol"] or ""): + conn.execute( + "UPDATE trade_order_monitors SET status='closed' WHERE id=?", + (r["id"],), + ) + + +def _dedupe_active_monitors(conn) -> None: + """同一品种方向只保留一条 active 监控,避免重复触发平仓。""" + groups: dict[str, list[dict]] = {} + for r in conn.execute( + "SELECT * FROM trade_order_monitors WHERE status='active' ORDER BY id ASC" + ).fetchall(): + row = dict(r) + key = _position_key(row.get("symbol") or "", row.get("direction") or "long") + groups.setdefault(key, []).append(row) + for items in groups.values(): + if len(items) <= 1: + continue + + def _keep_score(m: dict) -> tuple: + mt = (m.get("monitor_type") or "").lower() + score = 0 + if mt != "ctp_sync": + score += 10 + if m.get("stop_loss") is not None: + score += 5 + return (score, int(m.get("id") or 0)) + + items.sort(key=_keep_score, reverse=True) + for dup in items[1:]: + conn.execute( + "UPDATE trade_order_monitors SET status='closed' WHERE id=?", + (dup["id"],), + ) + + def _can_close_now(monitor_id: int, *, cooldown: int = PLACE_COOLDOWN_SEC) -> bool: last = _last_close_attempt.get(monitor_id, 0.0) return (time.time() - last) >= cooldown @@ -575,6 +639,7 @@ def _execute_local_close( positions = ctp_list_positions(mode) pos = _find_position(positions, sym, direction) if not pos: + _close_all_monitors_for_symbol(conn, sym, direction) reconcile_monitors_without_position(conn, mode) return lots = int(pos.get("lots") or mon.get("lots") or 1) @@ -590,24 +655,16 @@ def _execute_local_close( price=mark, order_type="market", ) - _write_trade_log( - conn, - mon, - close_price=mark, - reason=reason, - trading_mode=mode, - capital=capital, - ) - conn.execute("UPDATE trade_order_monitors SET status='closed' WHERE id=?", (mon["id"],)) + _close_all_monitors_for_symbol(conn, sym, direction) conn.commit() result_label = _result_for_close(mon, reason) logger.info( - "止盈止损本地触发 monitor=%s result=%s %s %s %d手 @%s", + "止盈止损本地触发 monitor=%s result=%s %s %s %d手 @%s(待 CTP 成交同步写入交易记录)", mon.get("id"), result_label, sym, direction, lots, mark, ) if notify_fn: try: - notify_fn(f"{result_label} {sym} {direction} {lots}手 @{mark},已记入交易记录") + notify_fn(f"{result_label} {sym} {direction} {lots}手 @{mark},平仓委托已提交") except Exception as exc: logger.debug("SL/TP notify failed: %s", exc) @@ -627,12 +684,12 @@ def check_monitors_locally( if not is_trading_session(): return 0 reconcile_monitors_without_position(conn, mode) + _dedupe_active_monitors(conn) conn.commit() closed = 0 rows = [dict(r) for r in conn.execute( "SELECT * FROM trade_order_monitors WHERE status='active'" ).fetchall()] - conn.commit() for mon in rows: mid = int(mon.get("id") or 0) sym = (mon.get("symbol") or "").strip() @@ -677,7 +734,7 @@ def check_monitors_locally( continue if mid > 0 and not _can_close_now(mid): continue - if mid > 0 and not _try_acquire_close(mid): + if not _try_acquire_close_symbol(sym, direction): continue try: _execute_local_close( @@ -695,8 +752,7 @@ def check_monitors_locally( except Exception as exc: logger.warning("SL/TP local close failed monitor=%s: %s", mid, exc) finally: - if mid > 0: - _release_close(mid) + _release_close_symbol(sym, direction) return closed diff --git a/symbols.py b/symbols.py index fc719e0..e4662d9 100644 --- a/symbols.py +++ b/symbols.py @@ -140,6 +140,17 @@ def _find_product_by_letters(letters: str) -> Optional[dict]: return None +def _product_codes(product: dict, ths_code: str, market_code: str, sina_code: str) -> dict: + return { + "ths_code": ths_code, + "market_code": market_code, + "sina_code": sina_code, + "ex": product["ex"], + "name": product["name"], + "exchange": product["exchange"], + } + + def ths_to_codes(ths_code: str) -> Optional[dict]: """同花顺合约代码 -> ths_full + sina 回退代码。""" code = ths_code.strip() @@ -155,11 +166,13 @@ def ths_to_codes(ths_code: str) -> Optional[dict]: return None product = _find_product_by_letters(letters) if product: - return { - "ths_code": build_ths_code(product, year, month), - "market_code": build_ths_full_code(product, year, month), - "sina_code": build_sina_code(product, year, month), - } + ths = build_ths_code(product, year, month) + return _product_codes( + product, + ths, + build_ths_full_code(product, year, month), + build_sina_code(product, year, month), + ) letters_up = letters.upper() if letters_up in ("IF", "IH", "IC", "IM", "T", "TF", "TS"): ths = f"{letters_up}{digits}" @@ -167,6 +180,9 @@ def ths_to_codes(ths_code: str) -> Optional[dict]: "ths_code": ths, "market_code": f"{ths}.CFFEX", "sina_code": f"CFF_RE_{letters_up}{digits}", + "ex": "CFFEX", + "name": letters_up, + "exchange": "中金所", } m3 = re.match(r"^([A-Za-z]+)(\d{3})$", code) @@ -183,11 +199,13 @@ def ths_to_codes(ths_code: str) -> Optional[dict]: candidate += 10 product = _find_product_by_letters(letters) if product: - return { - "ths_code": build_ths_code(product, candidate, month), - "market_code": build_ths_full_code(product, candidate, month), - "sina_code": build_sina_code(product, candidate, month), - } + ths = build_ths_code(product, candidate, month) + return _product_codes( + product, + ths, + build_ths_full_code(product, candidate, month), + build_sina_code(product, candidate, month), + ) return None diff --git a/trade_log_lib.py b/trade_log_lib.py index 0fb5957..bf207c0 100644 --- a/trade_log_lib.py +++ b/trade_log_lib.py @@ -32,6 +32,51 @@ def calc_equity_after(capital: float, pnl_net: float) -> float | None: return round(cap + float(pnl_net or 0), 2) +def _norm_symbol(symbol: str) -> str: + return (symbol or "").split(".")[0].strip().lower() + + +def _norm_close_minute(ts: str) -> str: + """统一 close_time 到分钟粒度,兼容 ISO `T` 与空格分隔。""" + return (ts or "").strip().replace("T", " ")[:16] + + +def purge_duplicate_local_trade_logs(conn) -> int: + """删除已被 CTP 柜台记录覆盖的本地重复成交。""" + removed = 0 + ctp_rows = [ + dict(r) + for r in conn.execute("SELECT * FROM trade_logs WHERE source='ctp'").fetchall() + ] + local_rows = [ + dict(r) + for r in conn.execute( + """SELECT * FROM trade_logs + WHERE COALESCE(source, 'local') != 'ctp' + AND (ctp_trade_key IS NULL OR ctp_trade_key = '')""" + ).fetchall() + ] + for ctp in ctp_rows: + ct16 = _norm_close_minute(ctp.get("close_time") or "") + sym_n = _norm_symbol(ctp.get("symbol") or "") + lots = float(ctp.get("lots") or 0) + direction = (ctp.get("direction") or "long").strip().lower() + for loc in local_rows: + if loc.get("id") == ctp.get("id"): + continue + if _norm_symbol(loc.get("symbol") or "") != sym_n: + continue + if (loc.get("direction") or "long").strip().lower() != direction: + continue + if _norm_close_minute(loc.get("close_time") or "") != ct16: + continue + if abs(float(loc.get("lots") or 0) - lots) > 0.01: + continue + conn.execute("DELETE FROM trade_logs WHERE id=?", (loc["id"],)) + removed += 1 + return removed + + def enrich_trades_for_records( trades: list[dict[str, Any]], *, diff --git a/vnpy_bridge.py b/vnpy_bridge.py index 580a0fb..38cb5ac 100644 --- a/vnpy_bridge.py +++ b/vnpy_bridge.py @@ -556,7 +556,21 @@ class CtpBridge: return Offset.CLOSE pos = self._find_position(sym, ex_u, hold_direction) if not pos: - # 找不到持仓明细时,日盘新开仓优先平今(避免 SHFE「平昨仓位不足」) + for p in self._collect_positions(): + ps = (p.get("symbol") or "").lower() + if ps != sym.lower(): + continue + if (p.get("direction") or "long") != hold_direction: + continue + td = int(p.get("td_volume") or 0) + yd = int(p.get("yd_volume") or 0) + if td >= lots: + return Offset.CLOSETODAY + if yd >= lots: + return Offset.CLOSEYESTERDAY + if td + yd >= lots: + return Offset.CLOSETODAY + break if ex_u in ("SHFE", "INE", "CZCE"): return Offset.CLOSETODAY return Offset.CLOSE @@ -1112,6 +1126,8 @@ class CtpBridge: sym, ex_name, d, vol, price, pos=pos, ) open_time = self._lookup_position_open_time(sym, d) or None + yd = int(getattr(pos, "yd_volume", 0) or 0) + td = max(0, vol - yd) out.append({ "symbol": sym, "exchange": ex_name, @@ -1122,6 +1138,8 @@ class CtpBridge: "frozen": int(getattr(pos, "frozen", 0) or 0), "margin": margin, "open_time": open_time, + "yd_volume": yd, + "td_volume": td, }) return out