fix(trend): surface DCA block reasons and ensure gate_bot poll thread

Log poll exceptions, diagnose live-trading and mark-price blocks on the trend page, start background monitors on app import, and add /api/trend_poll_status for debugging.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-07 17:46:26 +08:00
parent 0760873d9d
commit f5b4513ddb
3 changed files with 213 additions and 7 deletions
+134 -2
View File
@@ -41,6 +41,109 @@ MONITOR_TYPE_TREND = MONITOR_TYPE_TREND_PULLBACK
_TREND_FLAT_STREAK: dict[int, int] = {}
TREND_FLAT_CONFIRM_POLLS = max(1, int(os.getenv("TREND_FLAT_CONFIRM_POLLS", "5")))
TREND_OPEN_GRACE_SEC = max(0, int(os.getenv("TREND_OPEN_GRACE_SEC", "180")))
_TREND_LIVE_SKIP_LOG_TS = 0.0
_TREND_POLL_STATE: dict[str, Any] = {
"updated_at": None,
"live_ok": True,
"live_reason": "",
"plans": {},
}
def get_trend_poll_state() -> dict:
return dict(_TREND_POLL_STATE or {})
def _log_trend_live_skip(reason: str) -> None:
global _TREND_LIVE_SKIP_LOG_TS
now = time.time()
if now - _TREND_LIVE_SKIP_LOG_TS < 60:
return
_TREND_LIVE_SKIP_LOG_TS = now
print(f"[trend_pullback] poll skipped (live not ready): {reason}", flush=True)
def _set_trend_poll_plan(plan_id: int, info: dict) -> None:
plans = dict(_TREND_POLL_STATE.get("plans") or {})
plans[str(plan_id)] = info
_TREND_POLL_STATE["plans"] = plans
def summarize_trend_dca_probe(cfg: dict, row) -> dict:
"""诊断单计划为何未补仓(供页面 / API)。"""
m = _m(cfg)
d = _row(cfg, row)
plan_id = int(d.get("id") or 0)
sym = d.get("symbol") or ""
direction = (d.get("direction") or "long").lower()
ex_sym = d.get("exchange_symbol") or m.normalize_exchange_symbol(sym)
out: dict[str, Any] = {
"plan_id": plan_id,
"symbol": sym,
"mark_price": None,
"next_trigger": None,
"trigger_reached": False,
"legs_done": int(d.get("legs_done") or 0),
"first_order_done": int(d.get("first_order_done") or 0),
"block_reason": None,
}
try:
legs_done = int(d.get("legs_done") or 0)
grid = json.loads(d.get("grid_prices_json") or "[]")
if not isinstance(grid, list):
grid = []
leg_amounts = json.loads(d.get("leg_amounts_json") or "[]")
if not isinstance(leg_amounts, list):
leg_amounts = []
except Exception:
grid = []
leg_amounts = []
legs_done = 0
pf = _trend_poll_price(m, sym, ex_sym, direction)
out["mark_price"] = pf
ok_live, live_reason = m.ensure_exchange_live_ready()
out["live_ok"] = ok_live
if not ok_live:
out["block_reason"] = live_reason or "实盘未就绪"
if not int(d.get("first_order_done") or 0):
out["block_reason"] = out["block_reason"] or "首仓未完成"
return out
if legs_done >= len(grid) or legs_done >= len(leg_amounts):
out["block_reason"] = out["block_reason"] or "补仓档已全部完成或无 grid"
return out
try:
level = float(grid[legs_done])
except (TypeError, ValueError, IndexError):
out["block_reason"] = out["block_reason"] or "无效补仓触发价"
return out
out["next_trigger"] = level
if pf is None:
out["block_reason"] = out["block_reason"] or "无法读取标记价"
return out
reached = trend_dca_level_reached(direction, float(pf), level)
out["trigger_reached"] = reached
if reached and not ok_live:
out["block_reason"] = live_reason or "LIVE_TRADING_ENABLED=false"
elif reached and ok_live:
pos = m.get_live_position_contracts(ex_sym, direction)
try:
local_open = float(d.get("order_amount_open") or 0)
except (TypeError, ValueError):
local_open = 0.0
if pos is None and local_open > 0:
pos = local_open
if pos is None:
out["block_reason"] = "无法读取交易所持仓"
elif float(pos) <= 0:
out["block_reason"] = "交易所无持仓"
else:
out["block_reason"] = (
"标记价已触达,轮询应自动下单;若仍未补请确认 PM2 进程 crypto_gate_bot "
"(非 manual-agent-gate-bot)在运行,并查看 pm2 logs crypto_gate_bot"
)
elif not reached:
out["block_reason"] = f"标记价 {pf} 未触达下一档 {level}"
return out
def trend_add_zone_label(direction: str) -> str:
@@ -747,8 +850,24 @@ def _should_finalize_trend_flat(row, pos, plan_id: int, m) -> bool:
def check_trend_pullback_plans(cfg: dict) -> None:
m = _m(cfg)
ok_live, _ = m.ensure_exchange_live_ready()
ok_live, live_reason = m.ensure_exchange_live_ready()
_TREND_POLL_STATE["updated_at"] = time.time()
_TREND_POLL_STATE["live_ok"] = ok_live
_TREND_POLL_STATE["live_reason"] = live_reason or ""
if not ok_live:
_log_trend_live_skip(live_reason or "unknown")
conn = cfg["get_db"]()
try:
for row in conn.execute(
"SELECT * FROM trend_pullback_plans WHERE status='active'"
).fetchall():
probe = summarize_trend_dca_probe(cfg, row)
if probe.get("trigger_reached"):
_set_trend_poll_plan(int(row["id"]), probe)
except Exception as e:
print(f"[trend_pullback] live-skip probe error: {e}", flush=True)
finally:
conn.close()
return
conn = cfg["get_db"]()
rows = conn.execute(
@@ -867,7 +986,20 @@ def check_trend_pullback_plans(cfg: dict) -> None:
"UPDATE trend_pullback_plans SET last_mark_price=? WHERE id=?",
(pf, row["id"]),
)
except Exception:
probe = summarize_trend_dca_probe(cfg, row)
probe["last_poll_mark"] = pf
_set_trend_poll_plan(plan_id, probe)
if probe.get("trigger_reached") and probe.get("block_reason"):
print(
f"[trend_pullback] dca blocked plan={plan_id} sym={sym} "
f"mark={pf} next={probe.get('next_trigger')} reason={probe.get('block_reason')}",
flush=True,
)
except Exception as e:
print(
f"[trend_pullback] poll error plan={row['id'] if row else '?'}: {e}",
flush=True,
)
continue
conn.commit()
conn.close()