diff --git a/app.py b/app.py index bad79d1..a5c094c 100644 --- a/app.py +++ b/app.py @@ -77,8 +77,8 @@ def today_str() -> str: def calc_holding_duration(open_time: str, close_time: str) -> str: try: - o = datetime.fromisoformat(open_time.strip()) - c = datetime.fromisoformat(close_time.strip()) + o = datetime.fromisoformat(open_time.strip().replace(" ", "T")[:19]) + c = datetime.fromisoformat(close_time.strip().replace(" ", "T")[:19]) delta = c - o if delta.total_seconds() < 0: return "" diff --git a/install_trading.py b/install_trading.py index b2e95d3..6e4ce8b 100644 --- a/install_trading.py +++ b/install_trading.py @@ -165,32 +165,31 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se ths = _ctp_pos_to_ths_code(p) if not ths: continue - if _find_active_monitor(conn, ths, direction): + existing = _find_active_monitor(conn, ths, direction) + if existing: + _sync_monitor_lots_from_ctp( + conn, int(existing["id"]), ths, direction, mode, ctp=p, + ) continue - codes = ths_to_codes(ths) or {} - now_s = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - ensure_monitor_order_columns(conn) - conn.execute( - """INSERT INTO trade_order_monitors ( - symbol, symbol_name, market_code, direction, lots, entry_price, - stop_loss, take_profit, initial_stop_loss, trailing_be, - open_time, monitor_type, status - ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?, 'active')""", - ( - ths, - codes.get("name", ths) if codes else ths, - codes.get("market_code", "") if codes else "", - direction, - lots, - float(p.get("avg_price") or 0), - None, - None, - None, - 0, - now_s, - "ctp_sync", - ), + sl, tp, trailing_be, initial_sl = _restore_sl_tp_from_closed(conn, ths, direction) + ctp_open = (p.get("open_time") or "").strip() + mid = _upsert_open_monitor( + conn, + sym=ths, + direction=direction, + lots=lots, + price=float(p.get("avg_price") or 0), + sl=sl, + tp=tp, + trailing_be=trailing_be, + ctp_open_time=ctp_open or None, + monitor_type="ctp_sync", ) + if initial_sl is not None and sl is not None: + conn.execute( + "UPDATE trade_order_monitors SET initial_stop_loss=? WHERE id=?", + (initial_sl, mid), + ) def _match_ctp_symbol(ctp_sym: str, ths: str) -> bool: a = (ctp_sym or "").lower() @@ -216,10 +215,36 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se def _holding_duration(open_time: str, now_iso: str) -> str: try: from app import calc_holding_duration - return calc_holding_duration(open_time, now_iso) + open_s = (open_time or "").strip().replace("T", " ")[:19] + now_s = (now_iso or "").strip().replace("T", " ")[:19] + if not open_s or not now_s: + return "" + return calc_holding_duration(open_s, now_s) except Exception: return "" + def _restore_sl_tp_from_closed(conn, sym: str, direction: str) -> tuple: + """重启后从最近关闭的同品种监控恢复止盈止损。""" + direction = (direction or "long").strip().lower() + for r in conn.execute( + "SELECT symbol, direction, stop_loss, take_profit, trailing_be, initial_stop_loss " + "FROM trade_order_monitors WHERE status='closed' ORDER BY id DESC LIMIT 80" + ).fetchall(): + row = dict(r) + if (row.get("direction") or "long") != direction: + continue + if not _match_ctp_symbol(sym, row.get("symbol") or ""): + continue + if row.get("stop_loss") is None and row.get("take_profit") is None: + continue + return ( + row.get("stop_loss"), + row.get("take_profit"), + int(row.get("trailing_be") or 0), + row.get("initial_stop_loss"), + ) + return None, None, 0, None + def _ctp_position_keys(mode: str) -> set[tuple[str, str]]: keys: set[tuple[str, str]] = set() for p in _ctp_positions(mode): @@ -362,6 +387,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se sl, tp, trailing_be: int, + ctp_open_time: Optional[str] = None, + monitor_type: str = "manual", ) -> int: ensure_monitor_order_columns(conn) codes = ths_to_codes(sym) or {} @@ -372,8 +399,19 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se if existing: mid = int(existing["id"]) initial_sl = existing.get("initial_stop_loss") + if sl_f is None: + sl_f = float(existing["stop_loss"]) if existing.get("stop_loss") is not None else None + if tp_f is None: + tp_f = float(existing["take_profit"]) if existing.get("take_profit") is not None else None if sl_f is not None and initial_sl is None: initial_sl = sl_f + if not trailing_be: + trailing_be = int(existing.get("trailing_be") or 0) + open_time_val = existing.get("open_time") or now_s + if ctp_open_time: + prev = (open_time_val or "")[:19] + if not prev or ctp_open_time < prev: + open_time_val = ctp_open_time conn.execute( """UPDATE trade_order_monitors SET symbol=?, symbol_name=?, market_code=?, lots=?, entry_price=?, @@ -389,11 +427,12 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se tp_f, initial_sl, trailing_be, - now_s, + open_time_val, mid, ), ) else: + open_time_val = ctp_open_time or now_s conn.execute( """INSERT INTO trade_order_monitors ( symbol, symbol_name, market_code, direction, lots, entry_price, @@ -411,25 +450,45 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se tp_f, sl_f, trailing_be, - now_s, - "manual", + open_time_val, + monitor_type, ), ) mid = int(conn.execute("SELECT last_insert_rowid()").fetchone()[0]) _close_duplicate_monitors(conn, sym, direction, mid) return mid - def _sync_monitor_lots_from_ctp(conn, mid: int, sym: str, direction: str, mode: str) -> None: - for p in _ctp_positions(mode): - if int(p.get("lots") or 0) <= 0: + def _sync_monitor_lots_from_ctp( + conn, mid: int, sym: str, direction: str, mode: str, *, ctp: Optional[dict] = None, + ) -> None: + positions = [ctp] if ctp else _ctp_positions(mode, refresh_if_empty=False, refresh_margin=False) + for p in positions: + if not p or int(p.get("lots") or 0) <= 0: continue if (p.get("direction") or "long") != direction: continue if not _match_ctp_symbol(p.get("symbol") or "", sym): continue + ctp_open = (p.get("open_time") or "").strip() or None + row = conn.execute( + "SELECT open_time FROM trade_order_monitors WHERE id=?", (mid,), + ).fetchone() + db_open = (row["open_time"] or "").strip() if row else "" + open_time_val = db_open or ctp_open + if ctp_open and db_open: + if ctp_open < db_open[:19]: + open_time_val = ctp_open + elif ctp_open: + open_time_val = ctp_open conn.execute( - "UPDATE trade_order_monitors SET lots=?, entry_price=? WHERE id=?", - (int(p.get("lots") or 0), float(p.get("avg_price") or 0), mid), + """UPDATE trade_order_monitors SET lots=?, entry_price=?, + open_time=? WHERE id=?""", + ( + int(p.get("lots") or 0), + float(p.get("avg_price") or 0), + open_time_val, + mid, + ), ) return @@ -470,7 +529,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se tick = calc_order_tick_metrics(sym, lots, entry) sl = float(mon["stop_loss"]) if mon and mon.get("stop_loss") is not None else None tp = float(mon["take_profit"]) if mon and mon.get("take_profit") is not None else None - open_time = (mon.get("open_time") or "") if mon else "" + ctp_open = (ctp.get("open_time") or "").strip() if ctp else "" + open_time = ctp_open or ((mon.get("open_time") or "") if mon else "") holding = _holding_duration(open_time, now_iso) if open_time else "" mark = None @@ -627,7 +687,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se if ctp and mon: _sync_monitor_lots_from_ctp( conn, int(mon["id"]), mon.get("symbol") or "", - mon.get("direction") or "long", mode, + mon.get("direction") or "long", mode, ctp=ctp, ) mon = _find_active_monitor(conn, mon.get("symbol") or "", mon.get("direction") or "long") or mon try: diff --git a/sl_tp_guard.py b/sl_tp_guard.py index 588da81..10bc47d 100644 --- a/sl_tp_guard.py +++ b/sl_tp_guard.py @@ -20,6 +20,7 @@ from vnpy_bridge import ( ctp_list_positions, ctp_status, execute_order, + get_bridge, ) logger = logging.getLogger(__name__) @@ -478,7 +479,7 @@ def reconcile_monitors_without_position(conn, mode: str, *, grace_sec: int = 120 """持仓已平时:关闭监控并撤销残留止盈止损挂单(新开仓 grace_sec 内不清理)。""" if not ctp_status(mode).get("connected"): return 0 - positions = ctp_list_positions(mode) + positions = ctp_list_positions(mode, refresh_if_empty=False, refresh_margin=False) position_keys: set[tuple[str, str]] = set() for p in positions: if int(p.get("lots") or 0) <= 0: @@ -487,6 +488,15 @@ def reconcile_monitors_without_position(conn, mode: str, *, grace_sec: int = 120 direction = p.get("direction") or "long" position_keys.add((sym, direction)) + if not position_keys: + try: + acc = get_bridge().get_account() + margin_used = float(acc.get("balance") or 0) - float(acc.get("available") or 0) + if margin_used > 500: + return 0 + except Exception: + return 0 + now_ts = time.time() def _monitor_within_grace(mon: dict) -> bool: diff --git a/vnpy_bridge.py b/vnpy_bridge.py index 004f143..faf7d20 100644 --- a/vnpy_bridge.py +++ b/vnpy_bridge.py @@ -124,6 +124,7 @@ class CtpBridge: self._subscribed: set[str] = set() self._last_position_query_ts: float = 0.0 self._position_margins: dict[str, float] = {} + self._position_open_times: dict[str, str] = {} self._margin_hooked = False self._tick_hooked = False self._bar_generators: dict[str, Any] = {} @@ -727,6 +728,16 @@ class CtpBridge: def _position_margin_key(self, sym: str, direction: str) -> str: return f"{(sym or '').lower()}:{(direction or 'long').strip().lower()}" + def _lookup_position_open_time(self, sym: str, direction: str) -> str: + return (self._position_open_times.get(self._position_margin_key(sym, direction)) or "").strip() + + @staticmethod + def _parse_ctp_open_date(raw: str) -> str: + s = (raw or "").strip() + if len(s) >= 8 and s[:8].isdigit(): + return f"{s[:4]}-{s[4:6]}-{s[6:8]} 09:00:00" + return "" + def _install_position_margin_hook(self) -> None: """拦截 CTP 持仓回报,缓存柜台 UseMargin。""" if self._margin_hooked or not self._engine: @@ -758,6 +769,14 @@ class CtpBridge: bridge._position_margins[k] = ( bridge._position_margins.get(k, 0.0) + margin ) + open_date = bridge._parse_ctp_open_date( + str(data.get("OpenDate") or data.get("open_date") or "") + ) + if sym and open_date: + k = bridge._position_margin_key(sym, d) + prev = bridge._position_open_times.get(k, "") + if not prev or open_date < prev: + bridge._position_open_times[k] = open_date except Exception as exc: logger.debug("margin hook row: %s", exc) return original(data, error, reqid, last) @@ -783,6 +802,7 @@ class CtpBridge: exchange = getattr(pos, "exchange", None) ex_name = str(exchange.value if hasattr(exchange, "value") else exchange or "") margin = self._lookup_position_margin(sym, d) + open_time = self._lookup_position_open_time(sym, d) or None out.append({ "symbol": sym, "exchange": ex_name, @@ -792,6 +812,7 @@ class CtpBridge: "pnl": float(getattr(pos, "pnl", 0) or 0), "frozen": int(getattr(pos, "frozen", 0) or 0), "margin": round(margin, 2) if margin > 0 else None, + "open_time": open_time, }) return out @@ -809,6 +830,7 @@ class CtpBridge: td = getattr(gw, "td_api", None) if td and hasattr(td, "query_position"): self._position_margins.clear() + self._position_open_times.clear() td.query_position() time.sleep(0.4) except Exception as exc: