Fix position flicker, drop futures cooloff, prioritize startup display.

Preserve trading state when CTP memory is empty, bootstrap equity/positions on page load, stabilize risk status from DB monitors, and remove app-layer manual close cooling periods.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-30 23:17:18 +08:00
parent 2386eca324
commit 1b3a7f1bdc
9 changed files with 218 additions and 153 deletions
+108 -65
View File
@@ -8,6 +8,7 @@ from __future__ import annotations
import json
import logging
import os
import threading
import time
from datetime import datetime
@@ -67,11 +68,11 @@ from sl_tp_guard import (
)
from risk.account_risk_lib import (
assert_can_open,
count_active_trade_monitors,
get_risk_status,
on_mood_journal_freeze,
on_user_initiated_close,
parse_mood_issues,
reduce_cooloff_after_journal,
trading_day_label,
)
from strategy.strategy_db import init_strategy_tables
@@ -701,12 +702,9 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
return reconcile_monitors_without_position(conn, mode)
def _effective_active_position_count(conn, mode: str) -> int:
if ctp_status(mode).get("connected"):
return len(_ctp_position_keys(mode))
row = conn.execute(
"SELECT COUNT(*) AS n FROM trade_order_monitors WHERE status='active'"
).fetchone()
return int(row["n"] or 0)
"""风控持仓数以本地 active 监控为准,不随 CTP 内存空窗抖动。"""
del mode
return count_active_trade_monitors(conn)
def _build_pending_orders(conn, mode: str) -> list[dict]:
pending: list[dict] = []
@@ -1672,15 +1670,6 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
ctp_list = _ctp_positions(mode, refresh_if_empty=False, refresh_margin=False)
if not ctp_list:
ctp_list = trading_state.get_positions()
if not ctp_list:
try:
with _ctp_td_lock:
get_bridge().calibrate_trading_state()
except Exception as exc:
logger.debug("live calibrate: %s", exc)
ctp_list = trading_state.get_positions() or _ctp_positions(
mode, refresh_if_empty=False, refresh_margin=False,
)
rows: list[dict] = []
for p in ctp_list:
@@ -1742,6 +1731,9 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
if not deduped and ctp_status(mode).get("connected") and monitor_by_pk:
margin_used = float(ctp_account_margin_used(mode) or 0)
has_active_mon = any(
int(m.get("lots") or 0) > 0 for m in monitor_by_pk.values()
)
since_connect = 9999.0
try:
since_connect = time.time() - float(
@@ -1749,7 +1741,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
)
except Exception:
pass
if margin_used > 100 or since_connect < 300:
if margin_used > 0 or has_active_mon or since_connect < 300:
for mon in monitor_by_pk.values():
lots = int(mon.get("lots") or 0)
if lots <= 0:
@@ -1849,6 +1841,19 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
init_strategy_tables(conn)
payload = _build_trading_live_payload(conn, fast=fast)
commit_retry(conn)
prev = position_hub.get_snapshot()
if (
prev
and ctp_status(mode).get("connected")
and not (payload.get("rows") or [])
and (prev.get("rows") or [])
):
margin_used = float(ctp_account_margin_used(mode) or 0)
if margin_used > 0 or trading_state.sync_state == "syncing":
payload = dict(payload)
payload["rows"] = prev["rows"]
payload["sync_state"] = "syncing"
payload["sync_label"] = "同步中…"
return payload
finally:
conn.close()
@@ -1956,8 +1961,22 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
finally:
conn.close()
def _prime_position_snapshot() -> None:
"""进程启动同步预热:优先写入持仓/权益快照,页面打开即可读。"""
try:
payload = _refresh_trading_live_snapshot(fast=True)
position_hub.set_snapshot(payload)
n = len(payload.get("rows") or [])
logger.info(
"持仓快照已预热 capital=%s rows=%d",
payload.get("capital"),
n,
)
except Exception as exc:
logger.warning("prime position snapshot: %s", exc)
def _bootstrap_trading_runtime() -> None:
"""进程启动:读 CTP 快照推送,事件驱动增量 + 定期全量校准"""
"""进程启动:并发预热持仓快照 + CTP 连接,不阻塞 HTTP 监听"""
set_position_refresh_callback(
lambda: _push_position_snapshot_async(fast=True)
)
@@ -1969,23 +1988,31 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
try:
mode = get_trading_mode(get_setting)
if ctp_status(mode).get("connected"):
get_bridge().calibrate_trading_state()
with _ctp_td_lock:
get_bridge().calibrate_trading_state()
payload = _refresh_trading_live_snapshot(fast=False)
position_hub.set_snapshot(payload)
position_hub.broadcast("positions", payload)
except Exception as exc:
logger.warning("bootstrap position snapshot: %s", exc)
threading.Thread(target=_warm, daemon=True, name="position-bootstrap").start()
try:
from ctp_premarket_connect import should_auto_connect_now
from vnpy_bridge import ctp_start_connect
def _start_ctp() -> None:
try:
from ctp_premarket_connect import should_auto_connect_now
from vnpy_bridge import ctp_start_connect
if should_auto_connect_now():
mode = get_trading_mode(get_setting)
ctp_start_connect(mode, force=False, scheduled=True)
except Exception as exc:
logger.debug("bootstrap ctp connect: %s", exc)
if should_auto_connect_now():
mode = get_trading_mode(get_setting)
ctp_start_connect(mode, force=False, scheduled=True)
except Exception as exc:
logger.debug("bootstrap ctp connect: %s", exc)
from concurrent.futures import ThreadPoolExecutor
workers = max(2, int(os.getenv("QIHUO_STARTUP_WORKERS", "8") or 8))
with ThreadPoolExecutor(max_workers=min(workers, 4), thread_name_prefix="boot") as pool:
pool.submit(_warm)
pool.submit(_start_ctp)
def _on_ctp_connected(mode: str) -> None:
if mode != get_trading_mode(get_setting):
@@ -2050,6 +2077,12 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
_schedule_recommend_refresh()
ctp_connected = is_ctp_connected(get_setting)
margin_rec = small_account_margin_recommendations()
bootstrap_live: dict = {}
try:
bootstrap_live = _build_trading_live_payload(conn, fast=True)
position_hub.set_snapshot(bootstrap_live)
except Exception as exc:
logger.debug("positions page bootstrap: %s", exc)
return render_template(
"trade.html",
trading_mode=mode,
@@ -2083,6 +2116,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
session_clock=trading_session_clock(),
roll_max_margin_pct=get_roll_max_margin_pct(get_setting),
product_categories=PRODUCT_CATEGORIES,
bootstrap_live=bootstrap_live,
)
finally:
conn.close()
@@ -3836,8 +3870,6 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
def hook_review_mood(conn, behavior_tags: str, exit_trigger: str, exit_supplement: str):
if parse_mood_issues(behavior_tags):
on_mood_journal_freeze(conn, trading_day=trading_day_label())
if (exit_trigger or "").strip() == "手动平仓" and (exit_supplement or "").strip():
reduce_cooloff_after_journal(conn, trading_day=trading_day_label())
app._risk_review_hook = hook_review_mood
@@ -3846,33 +3878,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
def _init_tables(conn):
init_strategy_tables(conn)
start_recommend_worker(
db_path=DB_PATH,
get_capital_fn=_recommend_capital,
quote_fn=_main_quote,
init_tables_fn=_init_tables,
get_mode_fn=lambda: get_trading_mode(get_setting),
get_max_margin_pct_fn=lambda: get_max_margin_pct(get_setting),
get_sizing_mode_fn=lambda: get_sizing_mode(get_setting),
get_fixed_lots_fn=lambda: get_fixed_lots(get_setting),
)
start_ctp_reconnect_worker(
get_mode_fn=lambda: get_trading_mode(get_setting),
get_setting_fn=get_setting,
)
start_ctp_premarket_connect_worker(
get_mode_fn=lambda: get_trading_mode(get_setting),
get_setting_fn=get_setting,
)
start_sl_tp_guard_worker(
db_path=DB_PATH,
get_mode_fn=lambda: get_trading_mode(get_setting),
init_tables_fn=_init_tables,
get_capital_fn=_capital,
get_be_tick_buffer_fn=lambda: get_trailing_be_tick_buffer(get_setting),
notify_fn=send_wechat_msg,
interval=1,
)
_prime_position_snapshot()
_pos_refresh_tick = {"n": 0}
_last_full_calibrate = {"ts": 0.0}
@@ -3904,18 +3911,22 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
idle_interval=3,
)
_bootstrap_trading_runtime()
start_ctp_fee_worker(
start_ctp_reconnect_worker(
get_mode_fn=lambda: get_trading_mode(get_setting),
get_setting_fn=get_setting,
set_setting_fn=set_setting,
)
from ai_worker import start_ai_worker
start_ai_worker(
db_path=DB_PATH,
start_ctp_premarket_connect_worker(
get_mode_fn=lambda: get_trading_mode(get_setting),
get_setting_fn=get_setting,
set_setting_fn=set_setting,
send_wechat_fn=send_wechat_msg,
)
start_sl_tp_guard_worker(
db_path=DB_PATH,
get_mode_fn=lambda: get_trading_mode(get_setting),
init_tables_fn=_init_tables,
get_capital_fn=_capital,
get_be_tick_buffer_fn=lambda: get_trailing_be_tick_buffer(get_setting),
notify_fn=send_wechat_msg,
interval=1,
)
start_pending_order_worker(
db_path=DB_PATH,
@@ -3925,3 +3936,35 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
reconcile_fn=_reconcile_pending,
on_changed_fn=lambda: _push_position_snapshot_async(fast=False),
)
def _start_deferred_workers() -> None:
time.sleep(2)
start_recommend_worker(
db_path=DB_PATH,
get_capital_fn=_recommend_capital,
quote_fn=_main_quote,
init_tables_fn=_init_tables,
get_mode_fn=lambda: get_trading_mode(get_setting),
get_max_margin_pct_fn=lambda: get_max_margin_pct(get_setting),
get_sizing_mode_fn=lambda: get_sizing_mode(get_setting),
get_fixed_lots_fn=lambda: get_fixed_lots(get_setting),
)
start_ctp_fee_worker(
get_mode_fn=lambda: get_trading_mode(get_setting),
get_setting_fn=get_setting,
set_setting_fn=set_setting,
)
from ai_worker import start_ai_worker
start_ai_worker(
db_path=DB_PATH,
get_setting_fn=get_setting,
set_setting_fn=set_setting,
send_wechat_fn=send_wechat_msg,
)
threading.Thread(
target=_start_deferred_workers,
daemon=True,
name="deferred-workers",
).start()