Implement CTP-authoritative trading UI with event-driven state.
Add in-memory order/position books fed by CTP events, split active orders above positions in the UI, tick-triggered local SL/TP, and 30-second full calibration. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
+428
-134
@@ -84,6 +84,7 @@ from trading_context import (
|
||||
trading_mode_label,
|
||||
)
|
||||
from ctp_symbol import ths_to_vnpy_symbol
|
||||
from ctp_trading_state import position_key, trading_state
|
||||
from vnpy_bridge import (
|
||||
_ctp_td_lock,
|
||||
ctp_cancel_order,
|
||||
@@ -99,6 +100,7 @@ from vnpy_bridge import (
|
||||
execute_order,
|
||||
get_bridge,
|
||||
set_position_refresh_callback,
|
||||
set_tick_sl_tp_callback,
|
||||
)
|
||||
|
||||
|
||||
@@ -207,11 +209,17 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
||||
|
||||
def _ctp_pos_to_ths_code(p: dict) -> str:
|
||||
sym = (p.get("symbol") or "").strip()
|
||||
ex = (p.get("exchange") or "").strip()
|
||||
if not sym:
|
||||
return ""
|
||||
codes = ths_to_codes(sym)
|
||||
if codes:
|
||||
return codes.get("ths_code") or sym
|
||||
if ex:
|
||||
from vnpy_bridge import CtpBridge
|
||||
ths = CtpBridge._vnpy_sym_to_ths(sym, ex)
|
||||
if ths:
|
||||
return ths
|
||||
return sym
|
||||
|
||||
def _resolve_position_margin(
|
||||
@@ -334,7 +342,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
||||
"""CTP 有持仓但本地无监控时,自动补写一条 active 记录供展示。"""
|
||||
if not ctp_status(mode).get("connected"):
|
||||
return
|
||||
for p in _ctp_positions(mode, refresh_if_empty=True):
|
||||
ctp_positions = _ctp_positions(mode, refresh_if_empty=True)
|
||||
for p in ctp_positions:
|
||||
lots = int(p.get("lots") or 0)
|
||||
if lots <= 0:
|
||||
continue
|
||||
@@ -342,7 +351,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
||||
ths = _ctp_pos_to_ths_code(p)
|
||||
if not ths:
|
||||
continue
|
||||
existing = _find_active_monitor(conn, ths, direction)
|
||||
existing = _find_or_revive_monitor(conn, ths, direction)
|
||||
if existing:
|
||||
_sync_monitor_from_ctp(
|
||||
conn, int(existing["id"]), ths, direction, mode, ctp=p,
|
||||
@@ -368,6 +377,43 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
||||
"UPDATE trade_order_monitors SET initial_stop_loss=? WHERE id=?",
|
||||
(initial_sl, mid),
|
||||
)
|
||||
if ctp_positions:
|
||||
return
|
||||
|
||||
def _restore_recent_pending_monitors(conn, mode: str) -> None:
|
||||
"""重启或 vnpy 委托缓存丢失时,恢复当日最近一笔可能仍有效的开仓挂单。"""
|
||||
if not ctp_status(mode).get("connected"):
|
||||
return
|
||||
if conn.execute("SELECT 1 FROM trade_order_monitors WHERE status='pending' LIMIT 1").fetchone():
|
||||
return
|
||||
today = datetime.now().strftime("%Y-%m-%d")
|
||||
row = conn.execute(
|
||||
"""SELECT * FROM trade_order_monitors
|
||||
WHERE status='closed' AND monitor_type='manual'
|
||||
AND vt_order_id IS NOT NULL AND vt_order_id != ''
|
||||
AND open_time LIKE ?
|
||||
ORDER BY id DESC LIMIT 1""",
|
||||
(f"{today}%",),
|
||||
).fetchone()
|
||||
if not row:
|
||||
return
|
||||
mon = dict(row)
|
||||
sym = mon.get("symbol") or ""
|
||||
direction = (mon.get("direction") or "long").strip().lower()
|
||||
if _find_active_monitor(conn, sym, direction):
|
||||
return
|
||||
for p in _ctp_positions(mode, refresh_if_empty=False):
|
||||
if int(p.get("lots") or 0) <= 0:
|
||||
continue
|
||||
if (p.get("direction") or "long") != direction:
|
||||
continue
|
||||
if _match_ctp_symbol(p.get("symbol") or "", sym):
|
||||
return
|
||||
conn.execute(
|
||||
"UPDATE trade_order_monitors SET status='pending' WHERE id=?",
|
||||
(mon["id"],),
|
||||
)
|
||||
logger.info("恢复挂单监控 id=%s sym=%s", mon.get("id"), sym)
|
||||
|
||||
def _match_ctp_symbol(ctp_sym: str, ths: str) -> bool:
|
||||
a = (ctp_sym or "").lower()
|
||||
@@ -540,7 +586,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
||||
ctp_st = ctp_status(mode)
|
||||
if ctp_st.get("connected"):
|
||||
for o in _ctp_active_orders(mode):
|
||||
sym = o.get("symbol") or ""
|
||||
sym = _ctp_pos_to_ths_code(o) or (o.get("symbol") or "")
|
||||
offset_s = (o.get("offset") or "").upper()
|
||||
kind = "limit"
|
||||
label = "委托挂单"
|
||||
@@ -548,6 +594,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
||||
label = "平仓委托"
|
||||
pending.append({
|
||||
"symbol_code": sym,
|
||||
"symbol": _symbol_display_fields(sym).get("symbol_name") or sym,
|
||||
"direction": o.get("direction") or "long",
|
||||
"direction_label": "做多" if o.get("direction") == "long" else "做空",
|
||||
"lots": int(o.get("lots") or 0),
|
||||
@@ -556,6 +603,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
||||
"label": label,
|
||||
"source": "ctp",
|
||||
"order_id": o.get("order_id"),
|
||||
"vt_order_id": o.get("vt_order_id") or o.get("order_id"),
|
||||
"can_cancel_order": is_trading_session(),
|
||||
"cancel_allowed": is_trading_session(),
|
||||
**_symbol_display_fields(sym),
|
||||
@@ -568,14 +616,50 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def _canonical_position_key(symbol: str, direction: str) -> str:
|
||||
def _canonical_position_key(symbol: str, direction: str, exchange: str = "") -> str:
|
||||
sym = (symbol or "").strip()
|
||||
d = (direction or "long").strip().lower()
|
||||
ex = (exchange or "").strip().upper()
|
||||
try:
|
||||
vnpy_sym, _ = ths_to_vnpy_symbol(sym)
|
||||
return f"{vnpy_sym.lower()}:{d}"
|
||||
vnpy_sym, ex2 = ths_to_vnpy_symbol(sym)
|
||||
sym = vnpy_sym
|
||||
if not ex:
|
||||
ex = ex2
|
||||
except Exception:
|
||||
return f"{sym.lower()}:{d}"
|
||||
sym = sym.lower()
|
||||
return position_key(ex, sym, d)
|
||||
|
||||
def _position_key_from_ctp(p: dict) -> str:
|
||||
return position_key(
|
||||
p.get("exchange") or "",
|
||||
p.get("symbol") or "",
|
||||
p.get("direction") or "long",
|
||||
)
|
||||
|
||||
def _monitor_position_key(mon: dict, exchange: str = "") -> str:
|
||||
sym = (mon.get("symbol") or "").strip()
|
||||
d = (mon.get("direction") or "long").strip().lower()
|
||||
ex = (exchange or "").strip().upper()
|
||||
try:
|
||||
vnpy_sym, ex2 = ths_to_vnpy_symbol(sym)
|
||||
sym = vnpy_sym
|
||||
if not ex:
|
||||
ex = ex2
|
||||
except Exception:
|
||||
sym = sym.lower()
|
||||
return position_key(ex, sym, d)
|
||||
|
||||
def _monitors_by_position_key(conn) -> dict[str, dict]:
|
||||
ensure_monitor_order_columns(conn)
|
||||
out: dict[str, dict] = {}
|
||||
for r in conn.execute(
|
||||
"SELECT * FROM trade_order_monitors WHERE status='active' ORDER BY id DESC"
|
||||
).fetchall():
|
||||
mon = dict(r)
|
||||
pk = _monitor_position_key(mon)
|
||||
if pk not in out:
|
||||
out[pk] = mon
|
||||
return out
|
||||
|
||||
def _find_active_monitor(conn, symbol: str, direction: str) -> Optional[dict]:
|
||||
direction = (direction or "long").strip().lower()
|
||||
@@ -589,6 +673,51 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
||||
return row
|
||||
return None
|
||||
|
||||
def _revive_closed_monitor(conn, symbol: str, direction: str) -> Optional[dict]:
|
||||
"""柜台仍有持仓但本地监控被误关时,恢复最近一条同品种记录。"""
|
||||
direction = (direction or "long").strip().lower()
|
||||
for r in conn.execute(
|
||||
"SELECT * FROM trade_order_monitors WHERE status='closed' ORDER BY id DESC LIMIT 40"
|
||||
).fetchall():
|
||||
row = dict(r)
|
||||
if (row.get("direction") or "long") != direction:
|
||||
continue
|
||||
if not _match_ctp_symbol(symbol, row.get("symbol") or ""):
|
||||
continue
|
||||
if int(row.get("lots") or 0) <= 0:
|
||||
continue
|
||||
conn.execute(
|
||||
"UPDATE trade_order_monitors SET status='active' WHERE id=?",
|
||||
(row["id"],),
|
||||
)
|
||||
row["status"] = "active"
|
||||
logger.info(
|
||||
"恢复误关闭监控 id=%s sym=%s dir=%s",
|
||||
row.get("id"), row.get("symbol"), direction,
|
||||
)
|
||||
return row
|
||||
return None
|
||||
|
||||
def _find_or_revive_monitor(conn, symbol: str, direction: str) -> Optional[dict]:
|
||||
active = _find_active_monitor(conn, symbol, direction)
|
||||
if active:
|
||||
return active
|
||||
return _revive_closed_monitor(conn, symbol, direction)
|
||||
|
||||
def _close_all_monitors_for_sym_dir(conn, symbol: str, direction: str) -> None:
|
||||
direction = (direction or "long").strip().lower()
|
||||
for r in conn.execute(
|
||||
"SELECT id, symbol, direction FROM trade_order_monitors "
|
||||
"WHERE status IN ('active', 'pending')"
|
||||
).fetchall():
|
||||
if (r["direction"] or "long") != direction:
|
||||
continue
|
||||
if _match_ctp_symbol(symbol, r["symbol"] or ""):
|
||||
conn.execute(
|
||||
"UPDATE trade_order_monitors SET status='closed' WHERE id=?",
|
||||
(r["id"],),
|
||||
)
|
||||
|
||||
def _close_duplicate_monitors(conn, symbol: str, direction: str, keep_id: int) -> None:
|
||||
direction = (direction or "long").strip().lower()
|
||||
for r in conn.execute(
|
||||
@@ -950,18 +1079,15 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
||||
"source": "monitor",
|
||||
"monitor_id": mon["id"] if mon else None,
|
||||
})
|
||||
row_key = _canonical_position_key(sym, direction)
|
||||
ctp_st = ctp_status(mode)
|
||||
sync_pending = (
|
||||
mon is not None
|
||||
and ctp is None
|
||||
and bool(ctp_st.get("connected"))
|
||||
row_key = _canonical_position_key(
|
||||
sym, direction, (ctp or {}).get("exchange") or "",
|
||||
)
|
||||
return {
|
||||
"key": row_key,
|
||||
"source": "ctp" if ctp else "local",
|
||||
"position_key": row_key,
|
||||
"source": "ctp",
|
||||
"source_label": source_label,
|
||||
"sync_pending": sync_pending,
|
||||
"sync_pending": False,
|
||||
"monitor_id": mon["id"] if mon else None,
|
||||
"symbol_code": sym,
|
||||
**_symbol_display_fields(sym),
|
||||
@@ -1073,6 +1199,69 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
||||
"trailing_r_locked": int(mon.get("trailing_r_locked") or 0),
|
||||
}
|
||||
|
||||
def _compose_ctp_open_order_row(
|
||||
o: dict,
|
||||
*,
|
||||
mode: str,
|
||||
capital: float,
|
||||
now_iso: str,
|
||||
) -> Optional[dict]:
|
||||
offset_u = (o.get("offset") or "").upper()
|
||||
if offset_u and "OPEN" not in offset_u:
|
||||
return None
|
||||
sym = _ctp_pos_to_ths_code(o) or (o.get("symbol") or "").strip()
|
||||
direction = (o.get("direction") or "long").strip().lower()
|
||||
lots = int(o.get("lots") or 0)
|
||||
if not sym or lots <= 0:
|
||||
return None
|
||||
order_price = float(o.get("price") or 0)
|
||||
pos_metrics = calc_position_metrics(
|
||||
direction, order_price, order_price, order_price, lots, order_price, capital, sym,
|
||||
)
|
||||
timeout_sec = get_pending_order_timeout_sec(get_setting)
|
||||
return {
|
||||
"key": f"{_canonical_position_key(sym, direction)}:pending:ctp:{o.get('order_id') or ''}",
|
||||
"order_state": "pending",
|
||||
"source": "ctp",
|
||||
"source_label": "委托挂单",
|
||||
"sync_pending": True,
|
||||
"monitor_id": None,
|
||||
"order_id": o.get("order_id"),
|
||||
"vt_order_id": o.get("vt_order_id") or o.get("order_id"),
|
||||
"symbol_code": sym,
|
||||
**_symbol_display_fields(sym),
|
||||
"direction": direction,
|
||||
"direction_label": "做多" if direction == "long" else "做空",
|
||||
"lots": lots,
|
||||
"entry_price": order_price,
|
||||
"order_price": order_price,
|
||||
"stop_loss": None,
|
||||
"take_profit": None,
|
||||
"open_time": now_iso,
|
||||
"holding_duration": None,
|
||||
"mark_price": order_price,
|
||||
"current_price": order_price,
|
||||
"margin": pos_metrics.get("margin"),
|
||||
"margin_source": "estimate",
|
||||
"position_pct": pos_metrics.get("position_pct"),
|
||||
"float_pnl": None,
|
||||
"est_fee": None,
|
||||
"can_close": False,
|
||||
"close_allowed": False,
|
||||
"can_cancel_order": is_trading_session(),
|
||||
"cancel_allowed": is_trading_session(),
|
||||
"pending_timeout_sec": timeout_sec,
|
||||
"pending_timeout_min": max(1, timeout_sec // 60),
|
||||
"sl_order_active": False,
|
||||
"tp_order_active": False,
|
||||
"sl_monitoring": False,
|
||||
"tp_monitoring": False,
|
||||
"can_place_orders": False,
|
||||
"pending_orders": [],
|
||||
"trailing_be": False,
|
||||
"trailing_r_locked": 0,
|
||||
}
|
||||
|
||||
def _reconcile_pending(conn, mode: str, *, capital: float = 0.0) -> None:
|
||||
reconcile_pending_orders(
|
||||
conn,
|
||||
@@ -1084,7 +1273,123 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
||||
timeout_sec=get_pending_order_timeout_sec(get_setting),
|
||||
)
|
||||
|
||||
def _build_active_orders(
|
||||
conn,
|
||||
*,
|
||||
mode: str,
|
||||
capital: float,
|
||||
now_iso: str,
|
||||
) -> list[dict]:
|
||||
"""当前委托:以 CTP 柜台为准,本地 pending 开仓单合并展示。"""
|
||||
orders: list[dict] = []
|
||||
seen_keys: set[str] = set()
|
||||
|
||||
if ctp_status(mode).get("connected"):
|
||||
ctp_orders = trading_state.get_active_orders()
|
||||
if not ctp_orders:
|
||||
ctp_orders = _ctp_active_orders(mode)
|
||||
for o in ctp_orders:
|
||||
try:
|
||||
row = _compose_ctp_open_order_row(
|
||||
o, mode=mode, capital=capital, now_iso=now_iso,
|
||||
)
|
||||
if not row:
|
||||
row = _compose_ctp_order_row_any(
|
||||
o, mode=mode, capital=capital, now_iso=now_iso,
|
||||
)
|
||||
if row:
|
||||
orders.append(row)
|
||||
seen_keys.add(row.get("key") or "")
|
||||
except Exception as exc:
|
||||
logger.warning("compose ctp order row failed: %s", exc)
|
||||
|
||||
for r in conn.execute(
|
||||
"SELECT * FROM trade_order_monitors WHERE status='pending' ORDER BY id DESC"
|
||||
).fetchall():
|
||||
mon = dict(r)
|
||||
try:
|
||||
prow = _compose_pending_row(
|
||||
mon, mode=mode, capital=capital, now_iso=now_iso,
|
||||
)
|
||||
if prow and prow.get("key") not in seen_keys:
|
||||
pk = f"{prow.get('symbol_code') or ''}:{prow.get('direction') or ''}"
|
||||
dup = any(
|
||||
(x.get("symbol_code") or "") + ":" + (x.get("direction") or "") == pk
|
||||
and x.get("order_state") == "pending"
|
||||
for x in orders
|
||||
)
|
||||
if not dup:
|
||||
orders.append(prow)
|
||||
except Exception as exc:
|
||||
logger.warning("compose pending order row failed: %s", exc)
|
||||
return orders
|
||||
|
||||
def _compose_ctp_order_row_any(
|
||||
o: dict,
|
||||
*,
|
||||
mode: str,
|
||||
capital: float,
|
||||
now_iso: str,
|
||||
) -> Optional[dict]:
|
||||
"""CTP 任意未成交委托(含平仓)。"""
|
||||
sym = _ctp_pos_to_ths_code(o) or (o.get("symbol") or "").strip()
|
||||
direction = (o.get("direction") or "long").strip().lower()
|
||||
lots = int(o.get("lots") or 0)
|
||||
if not sym or lots <= 0:
|
||||
return None
|
||||
offset_u = (o.get("offset") or "").upper()
|
||||
is_open = not offset_u or "OPEN" in offset_u
|
||||
order_price = float(o.get("price") or 0)
|
||||
pos_metrics = calc_position_metrics(
|
||||
direction, order_price, order_price, order_price, lots, order_price, capital, sym,
|
||||
)
|
||||
label = "开仓委托" if is_open else "平仓委托"
|
||||
timeout_sec = get_pending_order_timeout_sec(get_setting)
|
||||
ex = o.get("exchange") or ""
|
||||
pk = _canonical_position_key(sym, direction, ex)
|
||||
return {
|
||||
"key": f"{pk}:order:{o.get('order_id') or ''}",
|
||||
"order_state": "pending",
|
||||
"source": "ctp",
|
||||
"source_label": label,
|
||||
"sync_pending": False,
|
||||
"monitor_id": None,
|
||||
"order_id": o.get("order_id"),
|
||||
"vt_order_id": o.get("vt_order_id") or o.get("order_id"),
|
||||
"symbol_code": sym,
|
||||
**_symbol_display_fields(sym),
|
||||
"direction": direction,
|
||||
"direction_label": "做多" if direction == "long" else "做空",
|
||||
"lots": lots,
|
||||
"entry_price": order_price,
|
||||
"order_price": order_price,
|
||||
"stop_loss": None,
|
||||
"take_profit": None,
|
||||
"open_time": now_iso,
|
||||
"mark_price": order_price,
|
||||
"current_price": order_price,
|
||||
"margin": pos_metrics.get("margin"),
|
||||
"margin_source": "estimate",
|
||||
"position_pct": pos_metrics.get("position_pct"),
|
||||
"float_pnl": None,
|
||||
"can_close": False,
|
||||
"close_allowed": False,
|
||||
"can_cancel_order": is_trading_session(),
|
||||
"cancel_allowed": is_trading_session(),
|
||||
"pending_timeout_sec": timeout_sec if is_open else None,
|
||||
"pending_timeout_min": max(1, timeout_sec // 60) if is_open else None,
|
||||
"sl_order_active": False,
|
||||
"tp_order_active": False,
|
||||
"sl_monitoring": False,
|
||||
"tp_monitoring": False,
|
||||
"can_place_orders": False,
|
||||
"pending_orders": [],
|
||||
"trailing_be": False,
|
||||
"trailing_r_locked": 0,
|
||||
}
|
||||
|
||||
def _build_trading_live_rows(conn, *, fast: bool = False) -> list[dict]:
|
||||
"""当前持仓:以 CTP 为准,SQLite 仅叠加 SL/TP 元数据。"""
|
||||
from zoneinfo import ZoneInfo
|
||||
tz = ZoneInfo("Asia/Shanghai")
|
||||
now_iso = datetime.now(tz).strftime("%Y-%m-%dT%H:%M")
|
||||
@@ -1092,127 +1397,73 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
||||
capital = _capital(conn)
|
||||
ensure_monitor_order_columns(conn)
|
||||
|
||||
monitors_raw = [
|
||||
dict(r) for r in conn.execute(
|
||||
"SELECT * FROM trade_order_monitors WHERE status='active' ORDER BY id DESC"
|
||||
).fetchall()
|
||||
]
|
||||
monitor_by_key: dict[str, dict] = {}
|
||||
for mon in monitors_raw:
|
||||
key = _canonical_position_key(mon.get("symbol") or "", mon.get("direction") or "long")
|
||||
if key not in monitor_by_key:
|
||||
monitor_by_key[key] = mon
|
||||
|
||||
ctp_list: list[dict] = (
|
||||
_ctp_positions(mode, refresh_if_empty=not fast, refresh_margin=not fast)
|
||||
if ctp_status(mode).get("connected") else []
|
||||
)
|
||||
ctp_by_key: dict[str, dict] = {}
|
||||
for p in ctp_list:
|
||||
if int(p.get("lots") or 0) <= 0:
|
||||
continue
|
||||
key = _canonical_position_key(p.get("symbol") or "", p.get("direction") or "long")
|
||||
ctp_by_key[key] = p
|
||||
monitor_by_pk = _monitors_by_position_key(conn)
|
||||
ctp_list: list[dict] = []
|
||||
if ctp_status(mode).get("connected"):
|
||||
ctp_list = trading_state.get_positions()
|
||||
if not ctp_list:
|
||||
ctp_list = _ctp_positions(
|
||||
mode, refresh_if_empty=not fast, refresh_margin=not fast,
|
||||
)
|
||||
|
||||
rows: list[dict] = []
|
||||
used_ctp_keys: set[str] = set()
|
||||
|
||||
for key, mon in monitor_by_key.items():
|
||||
ctp = ctp_by_key.get(key)
|
||||
if not ctp:
|
||||
for ck, cp in ctp_by_key.items():
|
||||
if ck in used_ctp_keys:
|
||||
for p in ctp_list:
|
||||
lots = int(p.get("lots") or 0)
|
||||
if lots <= 0:
|
||||
continue
|
||||
pk = p.get("position_key") or _position_key_from_ctp(p)
|
||||
mon = monitor_by_pk.get(pk)
|
||||
if not mon:
|
||||
for mk, mv in monitor_by_pk.items():
|
||||
if (mv.get("direction") or "long") != (p.get("direction") or "long"):
|
||||
continue
|
||||
if (cp.get("direction") or "long") != (mon.get("direction") or "long"):
|
||||
continue
|
||||
if _match_ctp_symbol(cp.get("symbol") or "", mon.get("symbol") or ""):
|
||||
ctp = cp
|
||||
used_ctp_keys.add(ck)
|
||||
if _match_ctp_symbol(p.get("symbol") or "", mv.get("symbol") or ""):
|
||||
mon = mv
|
||||
break
|
||||
elif key in ctp_by_key:
|
||||
used_ctp_keys.add(key)
|
||||
if ctp and mon and not fast:
|
||||
ths = _ctp_pos_to_ths_code(p) or (p.get("symbol") or "")
|
||||
if mon and not fast:
|
||||
_sync_monitor_from_ctp(
|
||||
conn, int(mon["id"]), mon.get("symbol") or "",
|
||||
mon.get("direction") or "long", mode, ctp=ctp,
|
||||
capital=capital,
|
||||
conn, int(mon["id"]), mon.get("symbol") or ths,
|
||||
mon.get("direction") or p.get("direction") or "long",
|
||||
mode, ctp=p, capital=capital,
|
||||
)
|
||||
mon = _find_active_monitor(conn, mon.get("symbol") or "", mon.get("direction") or "long") or mon
|
||||
mon = _find_active_monitor(
|
||||
conn, mon.get("symbol") or ths, mon.get("direction") or "long",
|
||||
) or mon
|
||||
try:
|
||||
row = _compose_position_row(
|
||||
conn, mon=mon, ctp=ctp, mode=mode, capital=capital, now_iso=now_iso,
|
||||
fast=fast,
|
||||
conn, mon=mon, ctp=p, mode=mode, capital=capital,
|
||||
now_iso=now_iso, fast=fast,
|
||||
)
|
||||
if row:
|
||||
rows.append(row)
|
||||
except Exception as exc:
|
||||
logger.warning("compose monitor row failed: %s", exc)
|
||||
|
||||
for key, ctp in ctp_by_key.items():
|
||||
if key in used_ctp_keys:
|
||||
continue
|
||||
matched = False
|
||||
for uk in used_ctp_keys:
|
||||
if uk == key:
|
||||
matched = True
|
||||
break
|
||||
if matched:
|
||||
continue
|
||||
for existing in rows:
|
||||
if _match_ctp_symbol(
|
||||
ctp.get("symbol") or "", existing.get("symbol_code") or "",
|
||||
) and (ctp.get("direction") or "long") == (existing.get("direction") or "long"):
|
||||
matched = True
|
||||
break
|
||||
if matched:
|
||||
continue
|
||||
mon = _find_active_monitor(
|
||||
conn, ctp.get("symbol") or "", ctp.get("direction") or "long",
|
||||
)
|
||||
try:
|
||||
row = _compose_position_row(
|
||||
conn, mon=mon, ctp=ctp, mode=mode, capital=capital, now_iso=now_iso,
|
||||
fast=fast,
|
||||
)
|
||||
if row:
|
||||
rows.append(row)
|
||||
except Exception as exc:
|
||||
logger.warning("compose ctp row failed: %s", exc)
|
||||
logger.warning("compose ctp position row failed: %s", exc)
|
||||
|
||||
seen: set[str] = set()
|
||||
deduped: list[dict] = []
|
||||
for row in rows:
|
||||
rk = row.get("key") or f"{row.get('symbol_code')}:{row.get('direction')}"
|
||||
rk = row.get("key") or row.get("position_key") or ""
|
||||
if rk in seen:
|
||||
continue
|
||||
seen.add(rk)
|
||||
deduped.append(row)
|
||||
|
||||
pending_raw = [
|
||||
dict(r) for r in conn.execute(
|
||||
"SELECT * FROM trade_order_monitors WHERE status='pending' ORDER BY id DESC"
|
||||
).fetchall()
|
||||
]
|
||||
for mon in pending_raw:
|
||||
try:
|
||||
prow = _compose_pending_row(
|
||||
mon, mode=mode, capital=capital, now_iso=now_iso,
|
||||
)
|
||||
if prow:
|
||||
deduped.insert(0, prow)
|
||||
except Exception as exc:
|
||||
logger.warning("compose pending row failed: %s", exc)
|
||||
return deduped
|
||||
|
||||
def _build_trading_live_payload(conn, *, fast: bool = False) -> dict:
|
||||
from zoneinfo import ZoneInfo
|
||||
tz = ZoneInfo("Asia/Shanghai")
|
||||
now_iso = datetime.now(tz).strftime("%Y-%m-%dT%H:%M")
|
||||
mode = get_trading_mode(get_setting)
|
||||
ctp_st = ctp_status(mode)
|
||||
capital = _capital(conn)
|
||||
if not fast and ctp_st.get("connected"):
|
||||
if ctp_st.get("connected") and not fast:
|
||||
_reconcile_pending(conn, mode, capital=capital)
|
||||
if ctp_st.get("connected"):
|
||||
_ensure_monitors_from_ctp(conn, mode)
|
||||
_sync_trade_monitors_with_ctp(conn, mode)
|
||||
rows = _build_trading_live_rows(conn, fast=fast)
|
||||
active_orders = _build_active_orders(
|
||||
conn, mode=mode, capital=capital, now_iso=now_iso,
|
||||
)
|
||||
rows = _apply_account_margin_to_rows(rows, mode, capital)
|
||||
_persist_ctp_snapshot_to_monitors(conn, rows, mode)
|
||||
pending_orders = _build_pending_orders(conn, mode)
|
||||
@@ -1220,6 +1471,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
||||
return {
|
||||
"ok": True,
|
||||
"rows": rows,
|
||||
"active_orders": active_orders,
|
||||
"pending_orders": pending_orders,
|
||||
"capital": capital,
|
||||
"ctp_status": ctp_st,
|
||||
@@ -1227,16 +1479,26 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
||||
"risk_status": risk,
|
||||
"trading_session": is_trading_session(),
|
||||
"pending_order_timeout_min": get_pending_order_timeout_min(get_setting),
|
||||
"sync_state": trading_state.sync_state,
|
||||
"sync_label": trading_state.sync_label(),
|
||||
}
|
||||
|
||||
def _refresh_trading_live_snapshot(*, fast: bool = False) -> dict:
|
||||
mode = get_trading_mode(get_setting)
|
||||
if not fast and ctp_status(mode).get("connected"):
|
||||
try:
|
||||
with _ctp_td_lock:
|
||||
get_bridge().refresh_positions()
|
||||
except Exception as exc:
|
||||
logger.debug("refresh positions before snapshot: %s", exc)
|
||||
if ctp_status(mode).get("connected"):
|
||||
if not fast:
|
||||
try:
|
||||
with _ctp_td_lock:
|
||||
get_bridge().calibrate_trading_state()
|
||||
except Exception as exc:
|
||||
logger.debug("refresh calibrate: %s", exc)
|
||||
for p in trading_state.get_positions() or _ctp_positions(mode, refresh_if_empty=False):
|
||||
ths = _ctp_pos_to_ths_code(p)
|
||||
if ths:
|
||||
try:
|
||||
get_bridge().subscribe_symbol(ths)
|
||||
except Exception:
|
||||
pass
|
||||
conn = get_db()
|
||||
try:
|
||||
init_strategy_tables(conn)
|
||||
@@ -1246,7 +1508,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def _push_position_snapshot_async(*, fast: bool = False) -> None:
|
||||
def _push_position_snapshot_async(*, fast: bool = True) -> None:
|
||||
def _run() -> None:
|
||||
try:
|
||||
payload = _refresh_trading_live_snapshot(fast=fast)
|
||||
@@ -1256,15 +1518,43 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
||||
|
||||
threading.Thread(target=_run, daemon=True).start()
|
||||
|
||||
def _on_tick_sl_tp(exchange: str, symbol: str, price: float) -> None:
|
||||
from sl_tp_guard import check_sl_tp_on_tick
|
||||
from db_conn import DB_PATH, connect_db
|
||||
|
||||
mode = get_trading_mode(get_setting)
|
||||
if not ctp_status(mode).get("connected"):
|
||||
return
|
||||
conn = connect_db(DB_PATH)
|
||||
try:
|
||||
_init_tables(conn)
|
||||
capital = _capital(conn)
|
||||
n = check_sl_tp_on_tick(
|
||||
conn, mode, exchange, symbol, price,
|
||||
capital=capital, notify_fn=send_wechat_msg,
|
||||
be_tick_mult=get_trailing_be_tick_buffer(get_setting),
|
||||
)
|
||||
if n:
|
||||
conn.commit()
|
||||
_push_position_snapshot_async(fast=True)
|
||||
except Exception as exc:
|
||||
logger.debug("tick sl/tp: %s", exc)
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def _bootstrap_trading_runtime() -> None:
|
||||
"""进程启动:立刻读库展示持仓,并异步连 CTP。"""
|
||||
"""进程启动:读 CTP 快照推送,事件驱动增量 + 定期全量校准。"""
|
||||
set_position_refresh_callback(
|
||||
lambda: _push_position_snapshot_async(fast=False)
|
||||
lambda: _push_position_snapshot_async(fast=True)
|
||||
)
|
||||
set_tick_sl_tp_callback(_on_tick_sl_tp)
|
||||
|
||||
def _warm() -> None:
|
||||
try:
|
||||
payload = _refresh_trading_live_snapshot(fast=True)
|
||||
mode = get_trading_mode(get_setting)
|
||||
if ctp_status(mode).get("connected"):
|
||||
get_bridge().calibrate_trading_state()
|
||||
payload = _refresh_trading_live_snapshot(fast=False)
|
||||
position_hub.set_snapshot(payload)
|
||||
position_hub.broadcast("positions", payload)
|
||||
except Exception as exc:
|
||||
@@ -1346,6 +1636,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
||||
try:
|
||||
init_strategy_tables(conn)
|
||||
payload = _build_trading_live_payload(conn, fast=True)
|
||||
conn.commit()
|
||||
position_hub.set_snapshot(payload)
|
||||
return jsonify(payload)
|
||||
finally:
|
||||
@@ -1656,12 +1947,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
||||
symbol_name=(mon.get("symbol_name") or "") if mon else "",
|
||||
market_code=(mon.get("market_code") or "") if mon else "",
|
||||
)
|
||||
if mid:
|
||||
_close_duplicate_monitors(conn, sym, direction, mid)
|
||||
conn.execute(
|
||||
"UPDATE trade_order_monitors SET status='closed' WHERE id=?",
|
||||
(mid,),
|
||||
)
|
||||
_close_all_monitors_for_sym_dir(conn, sym, direction)
|
||||
conn.commit()
|
||||
try:
|
||||
from ctp_trade_sync import sync_trade_logs_from_ctp
|
||||
@@ -2449,26 +2735,34 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
||||
interval=1,
|
||||
)
|
||||
_pos_refresh_tick = {"n": 0}
|
||||
_last_full_calibrate = {"ts": 0.0}
|
||||
|
||||
def _position_worker_refresh() -> dict:
|
||||
import time as _time
|
||||
from ctp_trading_state import CALIBRATE_INTERVAL_SEC
|
||||
|
||||
_pos_refresh_tick["n"] += 1
|
||||
mode = get_trading_mode(get_setting)
|
||||
connected = bool(ctp_status(mode).get("connected"))
|
||||
# 已连接时每 2 秒完整对账;未连接时每 5 秒轻量刷新(禁止 query_position)
|
||||
if connected:
|
||||
use_fast = _pos_refresh_tick["n"] % 2 != 0
|
||||
else:
|
||||
use_fast = _pos_refresh_tick["n"] % 5 != 0
|
||||
payload = _refresh_trading_live_snapshot(fast=use_fast)
|
||||
if connected and use_fast and any(
|
||||
r.get("sync_pending") for r in (payload.get("rows") or [])
|
||||
):
|
||||
now = _time.time()
|
||||
need_full = (
|
||||
connected
|
||||
and (
|
||||
trading_state.needs_calibrate()
|
||||
or (now - _last_full_calibrate["ts"]) >= CALIBRATE_INTERVAL_SEC
|
||||
)
|
||||
)
|
||||
if need_full:
|
||||
_last_full_calibrate["ts"] = now
|
||||
payload = _refresh_trading_live_snapshot(fast=False)
|
||||
else:
|
||||
payload = _refresh_trading_live_snapshot(fast=True)
|
||||
return payload
|
||||
|
||||
start_position_worker(
|
||||
refresh_fn=_position_worker_refresh,
|
||||
interval=1,
|
||||
interval=2,
|
||||
idle_interval=5,
|
||||
)
|
||||
_bootstrap_trading_runtime()
|
||||
start_ctp_fee_worker(
|
||||
|
||||
Reference in New Issue
Block a user