fix: 开仓时间读CTP OpenDate,止盈止损持久化且重启不丢失

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-25 15:05:58 +08:00
parent 7daed9bd3a
commit 4d60b958ce
4 changed files with 130 additions and 38 deletions
+2 -2
View File
@@ -77,8 +77,8 @@ def today_str() -> str:
def calc_holding_duration(open_time: str, close_time: str) -> str:
try:
o = datetime.fromisoformat(open_time.strip())
c = datetime.fromisoformat(close_time.strip())
o = datetime.fromisoformat(open_time.strip().replace(" ", "T")[:19])
c = datetime.fromisoformat(close_time.strip().replace(" ", "T")[:19])
delta = c - o
if delta.total_seconds() < 0:
return ""
+94 -34
View File
@@ -165,31 +165,30 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
ths = _ctp_pos_to_ths_code(p)
if not ths:
continue
if _find_active_monitor(conn, ths, direction):
existing = _find_active_monitor(conn, ths, direction)
if existing:
_sync_monitor_lots_from_ctp(
conn, int(existing["id"]), ths, direction, mode, ctp=p,
)
continue
codes = ths_to_codes(ths) or {}
now_s = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
ensure_monitor_order_columns(conn)
sl, tp, trailing_be, initial_sl = _restore_sl_tp_from_closed(conn, ths, direction)
ctp_open = (p.get("open_time") or "").strip()
mid = _upsert_open_monitor(
conn,
sym=ths,
direction=direction,
lots=lots,
price=float(p.get("avg_price") or 0),
sl=sl,
tp=tp,
trailing_be=trailing_be,
ctp_open_time=ctp_open or None,
monitor_type="ctp_sync",
)
if initial_sl is not None and sl is not None:
conn.execute(
"""INSERT INTO trade_order_monitors (
symbol, symbol_name, market_code, direction, lots, entry_price,
stop_loss, take_profit, initial_stop_loss, trailing_be,
open_time, monitor_type, status
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?, 'active')""",
(
ths,
codes.get("name", ths) if codes else ths,
codes.get("market_code", "") if codes else "",
direction,
lots,
float(p.get("avg_price") or 0),
None,
None,
None,
0,
now_s,
"ctp_sync",
),
"UPDATE trade_order_monitors SET initial_stop_loss=? WHERE id=?",
(initial_sl, mid),
)
def _match_ctp_symbol(ctp_sym: str, ths: str) -> bool:
@@ -216,10 +215,36 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
def _holding_duration(open_time: str, now_iso: str) -> str:
try:
from app import calc_holding_duration
return calc_holding_duration(open_time, now_iso)
open_s = (open_time or "").strip().replace("T", " ")[:19]
now_s = (now_iso or "").strip().replace("T", " ")[:19]
if not open_s or not now_s:
return ""
return calc_holding_duration(open_s, now_s)
except Exception:
return ""
def _restore_sl_tp_from_closed(conn, sym: str, direction: str) -> tuple:
"""重启后从最近关闭的同品种监控恢复止盈止损。"""
direction = (direction or "long").strip().lower()
for r in conn.execute(
"SELECT symbol, direction, stop_loss, take_profit, trailing_be, initial_stop_loss "
"FROM trade_order_monitors WHERE status='closed' ORDER BY id DESC LIMIT 80"
).fetchall():
row = dict(r)
if (row.get("direction") or "long") != direction:
continue
if not _match_ctp_symbol(sym, row.get("symbol") or ""):
continue
if row.get("stop_loss") is None and row.get("take_profit") is None:
continue
return (
row.get("stop_loss"),
row.get("take_profit"),
int(row.get("trailing_be") or 0),
row.get("initial_stop_loss"),
)
return None, None, 0, None
def _ctp_position_keys(mode: str) -> set[tuple[str, str]]:
keys: set[tuple[str, str]] = set()
for p in _ctp_positions(mode):
@@ -362,6 +387,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
sl,
tp,
trailing_be: int,
ctp_open_time: Optional[str] = None,
monitor_type: str = "manual",
) -> int:
ensure_monitor_order_columns(conn)
codes = ths_to_codes(sym) or {}
@@ -372,8 +399,19 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
if existing:
mid = int(existing["id"])
initial_sl = existing.get("initial_stop_loss")
if sl_f is None:
sl_f = float(existing["stop_loss"]) if existing.get("stop_loss") is not None else None
if tp_f is None:
tp_f = float(existing["take_profit"]) if existing.get("take_profit") is not None else None
if sl_f is not None and initial_sl is None:
initial_sl = sl_f
if not trailing_be:
trailing_be = int(existing.get("trailing_be") or 0)
open_time_val = existing.get("open_time") or now_s
if ctp_open_time:
prev = (open_time_val or "")[:19]
if not prev or ctp_open_time < prev:
open_time_val = ctp_open_time
conn.execute(
"""UPDATE trade_order_monitors SET
symbol=?, symbol_name=?, market_code=?, lots=?, entry_price=?,
@@ -389,11 +427,12 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
tp_f,
initial_sl,
trailing_be,
now_s,
open_time_val,
mid,
),
)
else:
open_time_val = ctp_open_time or now_s
conn.execute(
"""INSERT INTO trade_order_monitors (
symbol, symbol_name, market_code, direction, lots, entry_price,
@@ -411,25 +450,45 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
tp_f,
sl_f,
trailing_be,
now_s,
"manual",
open_time_val,
monitor_type,
),
)
mid = int(conn.execute("SELECT last_insert_rowid()").fetchone()[0])
_close_duplicate_monitors(conn, sym, direction, mid)
return mid
def _sync_monitor_lots_from_ctp(conn, mid: int, sym: str, direction: str, mode: str) -> None:
for p in _ctp_positions(mode):
if int(p.get("lots") or 0) <= 0:
def _sync_monitor_lots_from_ctp(
conn, mid: int, sym: str, direction: str, mode: str, *, ctp: Optional[dict] = None,
) -> None:
positions = [ctp] if ctp else _ctp_positions(mode, refresh_if_empty=False, refresh_margin=False)
for p in positions:
if not p or int(p.get("lots") or 0) <= 0:
continue
if (p.get("direction") or "long") != direction:
continue
if not _match_ctp_symbol(p.get("symbol") or "", sym):
continue
ctp_open = (p.get("open_time") or "").strip() or None
row = conn.execute(
"SELECT open_time FROM trade_order_monitors WHERE id=?", (mid,),
).fetchone()
db_open = (row["open_time"] or "").strip() if row else ""
open_time_val = db_open or ctp_open
if ctp_open and db_open:
if ctp_open < db_open[:19]:
open_time_val = ctp_open
elif ctp_open:
open_time_val = ctp_open
conn.execute(
"UPDATE trade_order_monitors SET lots=?, entry_price=? WHERE id=?",
(int(p.get("lots") or 0), float(p.get("avg_price") or 0), mid),
"""UPDATE trade_order_monitors SET lots=?, entry_price=?,
open_time=? WHERE id=?""",
(
int(p.get("lots") or 0),
float(p.get("avg_price") or 0),
open_time_val,
mid,
),
)
return
@@ -470,7 +529,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
tick = calc_order_tick_metrics(sym, lots, entry)
sl = float(mon["stop_loss"]) if mon and mon.get("stop_loss") is not None else None
tp = float(mon["take_profit"]) if mon and mon.get("take_profit") is not None else None
open_time = (mon.get("open_time") or "") if mon else ""
ctp_open = (ctp.get("open_time") or "").strip() if ctp else ""
open_time = ctp_open or ((mon.get("open_time") or "") if mon else "")
holding = _holding_duration(open_time, now_iso) if open_time else ""
mark = None
@@ -627,7 +687,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
if ctp and mon:
_sync_monitor_lots_from_ctp(
conn, int(mon["id"]), mon.get("symbol") or "",
mon.get("direction") or "long", mode,
mon.get("direction") or "long", mode, ctp=ctp,
)
mon = _find_active_monitor(conn, mon.get("symbol") or "", mon.get("direction") or "long") or mon
try:
+11 -1
View File
@@ -20,6 +20,7 @@ from vnpy_bridge import (
ctp_list_positions,
ctp_status,
execute_order,
get_bridge,
)
logger = logging.getLogger(__name__)
@@ -478,7 +479,7 @@ def reconcile_monitors_without_position(conn, mode: str, *, grace_sec: int = 120
"""持仓已平时:关闭监控并撤销残留止盈止损挂单(新开仓 grace_sec 内不清理)。"""
if not ctp_status(mode).get("connected"):
return 0
positions = ctp_list_positions(mode)
positions = ctp_list_positions(mode, refresh_if_empty=False, refresh_margin=False)
position_keys: set[tuple[str, str]] = set()
for p in positions:
if int(p.get("lots") or 0) <= 0:
@@ -487,6 +488,15 @@ def reconcile_monitors_without_position(conn, mode: str, *, grace_sec: int = 120
direction = p.get("direction") or "long"
position_keys.add((sym, direction))
if not position_keys:
try:
acc = get_bridge().get_account()
margin_used = float(acc.get("balance") or 0) - float(acc.get("available") or 0)
if margin_used > 500:
return 0
except Exception:
return 0
now_ts = time.time()
def _monitor_within_grace(mon: dict) -> bool:
+22
View File
@@ -124,6 +124,7 @@ class CtpBridge:
self._subscribed: set[str] = set()
self._last_position_query_ts: float = 0.0
self._position_margins: dict[str, float] = {}
self._position_open_times: dict[str, str] = {}
self._margin_hooked = False
self._tick_hooked = False
self._bar_generators: dict[str, Any] = {}
@@ -727,6 +728,16 @@ class CtpBridge:
def _position_margin_key(self, sym: str, direction: str) -> str:
return f"{(sym or '').lower()}:{(direction or 'long').strip().lower()}"
def _lookup_position_open_time(self, sym: str, direction: str) -> str:
return (self._position_open_times.get(self._position_margin_key(sym, direction)) or "").strip()
@staticmethod
def _parse_ctp_open_date(raw: str) -> str:
s = (raw or "").strip()
if len(s) >= 8 and s[:8].isdigit():
return f"{s[:4]}-{s[4:6]}-{s[6:8]} 09:00:00"
return ""
def _install_position_margin_hook(self) -> None:
"""拦截 CTP 持仓回报,缓存柜台 UseMargin。"""
if self._margin_hooked or not self._engine:
@@ -758,6 +769,14 @@ class CtpBridge:
bridge._position_margins[k] = (
bridge._position_margins.get(k, 0.0) + margin
)
open_date = bridge._parse_ctp_open_date(
str(data.get("OpenDate") or data.get("open_date") or "")
)
if sym and open_date:
k = bridge._position_margin_key(sym, d)
prev = bridge._position_open_times.get(k, "")
if not prev or open_date < prev:
bridge._position_open_times[k] = open_date
except Exception as exc:
logger.debug("margin hook row: %s", exc)
return original(data, error, reqid, last)
@@ -783,6 +802,7 @@ class CtpBridge:
exchange = getattr(pos, "exchange", None)
ex_name = str(exchange.value if hasattr(exchange, "value") else exchange or "")
margin = self._lookup_position_margin(sym, d)
open_time = self._lookup_position_open_time(sym, d) or None
out.append({
"symbol": sym,
"exchange": ex_name,
@@ -792,6 +812,7 @@ class CtpBridge:
"pnl": float(getattr(pos, "pnl", 0) or 0),
"frozen": int(getattr(pos, "frozen", 0) or 0),
"margin": round(margin, 2) if margin > 0 else None,
"open_time": open_time,
})
return out
@@ -809,6 +830,7 @@ class CtpBridge:
td = getattr(gw, "td_api", None)
if td and hasattr(td, "query_position"):
self._position_margins.clear()
self._position_open_times.clear()
td.query_position()
time.sleep(0.4)
except Exception as exc: