Document Git-only deploy workflow and reduce positions page IPC blocking.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-07-01 14:05:37 +08:00
parent 7748a88219
commit b354d6c701
6 changed files with 247 additions and 65 deletions
+76 -50
View File
@@ -154,6 +154,65 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
"""注册交易相关路由。"""
_nav = require_nav
_live_refresh_lock = threading.Lock()
_ctp_status_cache: dict = {"mode": "", "status": {}, "ts": 0.0}
_ctp_status_cache_lock = threading.Lock()
_ctp_status_refresh_flag = {"busy": False}
def _remember_ctp_status(mode: str, st: dict) -> None:
if not isinstance(st, dict) or not st:
return
with _ctp_status_cache_lock:
_ctp_status_cache["mode"] = mode
_ctp_status_cache["status"] = dict(st)
_ctp_status_cache["ts"] = time.time()
def _schedule_ctp_status_refresh(mode: str) -> None:
with _ctp_status_cache_lock:
if _ctp_status_refresh_flag["busy"]:
return
_ctp_status_refresh_flag["busy"] = True
def _run() -> None:
try:
st = dict(ctp_status(mode) or {})
_remember_ctp_status(mode, st)
snap = position_hub.get_snapshot()
if snap:
merged = dict(snap)
merged["ctp_status"] = st
position_hub.set_snapshot(merged)
except Exception as exc:
logger.debug("ctp status refresh: %s", exc)
finally:
with _ctp_status_cache_lock:
_ctp_status_refresh_flag["busy"] = False
threading.Thread(
target=_run,
daemon=True,
name="ctp-status-refresh",
).start()
def _cached_ctp_status(mode: str) -> dict:
"""页面/SSE 优先读快照与内存缓存,避免同步 worker IPC 阻塞 HTTP 线程。"""
try:
snap = position_hub.get_snapshot() or {}
st = snap.get("ctp_status")
if isinstance(st, dict) and st:
_remember_ctp_status(mode, st)
return dict(st)
except Exception:
pass
with _ctp_status_cache_lock:
if _ctp_status_cache["mode"] == mode and _ctp_status_cache["status"]:
return dict(_ctp_status_cache["status"])
_schedule_ctp_status_refresh(mode)
return {
"connected": False,
"connecting": True,
"last_error": "",
"mode_label": trading_mode_label(get_setting),
}
def _sizing_mode_label(mode: str) -> str:
m = normalize_sizing_mode(mode)
@@ -217,7 +276,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
get_fixed_lots_fn=lambda: get_fixed_lots(get_setting),
)
def _recommend_payload(conn) -> dict:
def _recommend_payload(conn, *, use_ctp_margin: bool = True) -> dict:
mode = get_trading_mode(get_setting)
return recommend_payload(
conn,
@@ -226,6 +285,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
trading_mode=mode,
sizing_mode=get_sizing_mode(get_setting),
fixed_lots=get_fixed_lots(get_setting),
use_ctp_margin=use_ctp_margin,
)
def _recommend_capital(conn) -> float:
@@ -2011,6 +2071,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
now_iso = datetime.now(tz).strftime("%Y-%m-%dT%H:%M")
mode = get_trading_mode(get_setting)
ctp_st = ctp_status(mode)
_remember_ctp_status(mode, ctp_st)
capital = _capital(conn)
if ctp_st.get("connected") and not fast:
_reconcile_pending(conn, mode, capital=capital)
@@ -2061,45 +2122,19 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
}
def _minimal_live_payload(conn) -> dict:
"""CTP 直出兜底:不跑对账/写库,避免与后台 worker 争锁"""
from zoneinfo import ZoneInfo
tz = ZoneInfo("Asia/Shanghai")
now_iso = datetime.now(tz).strftime("%Y-%m-%dT%H:%M")
"""零 IPC 兜底:仅读库 + 缓存 CTP 状态,持仓由后台 worker 补全"""
mode = get_trading_mode(get_setting)
ctp_st = ctp_status(mode)
ctp_st = _cached_ctp_status(mode)
capital = _capital(conn)
rows: list[dict] = []
if ctp_st.get("connected"):
for p in _ctp_positions(mode, refresh_if_empty=False):
lots = int(p.get("lots") or 0)
if lots <= 0:
continue
ths = _ctp_pos_to_ths_code(p) or (p.get("symbol") or "")
direction = p.get("direction") or "long"
mon = {"symbol": ths, "direction": direction}
try:
row = _compose_position_row(
conn,
mon=mon,
ctp=p,
mode=mode,
capital=capital,
now_iso=now_iso,
fast=True,
)
if row:
rows.append(row)
except Exception as exc:
logger.warning("minimal live row failed: %s", exc)
risk = get_risk_status(
conn,
active_count=_effective_active_position_count(conn, mode),
active_count=count_active_trade_monitors(conn),
equity=capital,
)
syncing = bool(ctp_st.get("connected") or ctp_st.get("connecting"))
return {
"ok": True,
"rows": rows,
"rows": [],
"active_orders": [],
"pending_orders": [],
"capital": capital,
@@ -2110,8 +2145,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
"night_session": is_night_trading_session(),
"session_clock": trading_session_clock(),
"pending_order_timeout_min": get_pending_order_timeout_min(get_setting),
"sync_state": trading_state.sync_state,
"sync_label": trading_state.sync_label(),
"sync_state": "syncing" if syncing else trading_state.sync_state,
"sync_label": "加载中…" if syncing else trading_state.sync_label(),
}
def _normalize_live_payload(payload: dict) -> dict:
@@ -2445,10 +2480,10 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
conn.commit()
sizing = get_sizing_mode(get_setting)
max_pct = get_max_margin_pct(get_setting)
rec_cache = _recommend_payload(conn)
rec_cache = _recommend_payload(conn, use_ctp_margin=False)
if rec_cache.get("needs_refresh"):
_schedule_recommend_refresh()
ctp_connected = is_ctp_connected(get_setting)
ctp_connected = connected
margin_rec = small_account_margin_recommendations()
if not bootstrap_live:
bootstrap_live = {
@@ -2529,13 +2564,13 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
def api_trading_stream():
from queue import Empty
@stream_with_context
def generate():
yield ": stream\n\n"
q = position_hub.subscribe()
try:
snap = position_hub.get_snapshot()
if snap:
yield sse_format("positions", snap)
else:
if not snap:
conn = get_db()
try:
init_strategy_tables(conn)
@@ -2545,6 +2580,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
position_hub.set_snapshot(payload)
yield sse_format("positions", payload)
_push_position_snapshot_async(fast=True)
else:
yield sse_format("positions", snap)
while True:
try:
msg = q.get(timeout=25)
@@ -2866,17 +2903,6 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
def _roll_ui_modes():
return frozenset({ADD_MODE_MARKET, ADD_MODE_BREAKOUT})
def _cached_ctp_status(mode: str) -> dict:
"""页面渲染优先读持仓快照里的 CTP 状态,避免每次打 worker IPC。"""
try:
snap = position_hub.get_snapshot() or {}
st = snap.get("ctp_status")
if isinstance(st, dict) and st:
return dict(st)
except Exception:
pass
return dict(ctp_status(mode) or {})
def _roll_filled_lots_map(conn, group_ids: list[int]) -> dict[int, int]:
if not group_ids:
return {}