Files
crypto_monitor/strategy_roll_monitor_lib.py
dekun d467760d5c 顺势加仓 v2:程序监控滚仓、文档页与平仓同步
重写滚仓计仓与四种加仓方式(市价/斐波/突破),程序盯 mark 触价成交;风险读监控单;pending 可删不可改;手动平仓同步结束滚仓。新增 /strategy/roll/docs 说明页与顺势加仓滚仓说明.md。

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-26 22:03:23 +08:00

521 lines
17 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""滚仓程序监控:斐波/突破触价市价成交、失效、外部平仓同步(各所共用)。"""
from __future__ import annotations
from typing import Any, Optional
from strategy_roll_lib import (
BREAKOUT_MODE,
FIB_MODES,
MARKET_MODE,
mode_label,
roll_breakout_invalidate,
roll_breakout_trigger_crossed,
roll_fib_invalidate,
roll_fib_trigger_crossed,
calc_risk_budget_usdt,
max_roll_legs,
preview_roll,
solve_add_amount_for_total_risk,
)
from strategy_db import init_strategy_tables
ROLL_LEG_STATUS_LABELS = {
"pending": "监控中",
"filled": "已成交",
"cancelled": "已删除",
"invalidated": "已失效",
}
def roll_leg_status_label(status: Optional[str]) -> str:
s = (status or "").strip().lower()
return ROLL_LEG_STATUS_LABELS.get(s, status or "")
def check_roll_monitors(cfg: dict[str, Any]) -> None:
get_db = cfg["get_db"]
conn = get_db()
try:
init_strategy_tables(conn)
_reconcile_roll_groups(conn, cfg)
_check_pending_roll_legs(conn, cfg)
conn.commit()
except Exception:
try:
conn.rollback()
except Exception:
pass
finally:
try:
conn.close()
except Exception:
pass
def sync_roll_after_external_close(
cfg: dict, conn, symbol: str, direction: str, *, reason: str = "持仓已平"
) -> dict[str, Any]:
"""中控/实例手动平仓后:取消 pending 腿并关闭 active 滚仓组(保留 filled 历史)。"""
norm = cfg.get("normalize_symbol_input")
sym = norm(symbol) if callable(norm) else (symbol or "").strip()
if not sym:
return {"ok": False, "msg": "symbol 无效", "closed_groups": 0, "cancelled_legs": 0}
direction = (direction or "long").strip().lower()
init_strategy_tables(conn)
rows = conn.execute(
"""SELECT g.* FROM roll_groups g
WHERE g.status='active' AND g.symbol=? AND g.direction=?""",
(sym, direction),
).fetchall()
closed = cancelled = 0
for row in rows:
g = _row_dict(row)
cancelled += _cancel_pending_legs_for_group(conn, cfg, g, status="cancelled")
cur = conn.execute(
"UPDATE roll_groups SET status='closed', updated_at=? WHERE id=? AND status='active'",
(_now(cfg), int(g["id"])),
)
if getattr(cur, "rowcount", 0):
closed += 1
try:
from strategy_wechat_notify import notify_roll_group_ended
notify_roll_group_ended(
cfg,
group_id=int(g["id"]),
symbol=sym,
direction=direction,
reason=reason,
leg_count=int(g.get("leg_count") or 0),
)
except Exception:
pass
try:
from strategy_snapshot_lib import save_roll_group_snapshot
save_roll_group_snapshot(cfg, conn, g, result_label="结束")
except Exception:
pass
return {
"ok": True,
"symbol": sym,
"direction": direction,
"closed_groups": closed,
"cancelled_legs": cancelled,
}
def cancel_roll_pending_leg(cfg: dict, conn, leg_id: int) -> tuple[bool, str]:
"""用户删除 pending 滚仓腿(不可修改,仅删除)。"""
init_strategy_tables(conn)
row = conn.execute(
"SELECT l.*, g.symbol, g.direction, g.status AS group_status FROM roll_legs l "
"INNER JOIN roll_groups g ON g.id = l.roll_group_id WHERE l.id=?",
(int(leg_id),),
).fetchone()
if not row:
return False, "滚仓腿不存在"
leg = _row_dict(row)
if (leg.get("status") or "").strip().lower() != "pending":
return False, "仅监控中的腿可删除"
_cancel_roll_leg_order(cfg, {"symbol": leg.get("symbol"), "exchange_symbol": leg.get("exchange_symbol")}, leg)
conn.execute(
"UPDATE roll_legs SET status='cancelled' WHERE id=? AND status='pending'",
(int(leg_id),),
)
conn.commit()
return True, "已删除滚仓监控"
def count_filled_roll_legs(conn, roll_group_id: int) -> int:
row = conn.execute(
"SELECT COUNT(*) FROM roll_legs WHERE roll_group_id=? AND status='filled'",
(int(roll_group_id),),
).fetchone()
return int(row[0] if row else 0)
def count_pending_roll_legs(conn, roll_group_id: int) -> int:
row = conn.execute(
"SELECT COUNT(*) FROM roll_legs WHERE roll_group_id=? AND status='pending'",
(int(roll_group_id),),
).fetchone()
return int(row[0] if row else 0)
def _row_dict(row) -> dict:
if row is None:
return {}
try:
return dict(row)
except Exception:
return {}
def _now(cfg: dict) -> str:
fn = cfg.get("app_now_str")
return fn() if callable(fn) else ""
def _cancel_pending_legs_for_group(conn, cfg: dict, group: dict, *, status: str = "cancelled") -> int:
gid = int(group["id"])
n = 0
for leg in conn.execute(
"SELECT * FROM roll_legs WHERE roll_group_id=? AND status='pending'",
(gid,),
).fetchall():
ld = _row_dict(leg)
_cancel_roll_leg_order(cfg, group, ld)
conn.execute(
"UPDATE roll_legs SET status=? WHERE id=? AND status='pending'",
(status, ld["id"]),
)
n += 1
return n
def _close_roll_group(conn, cfg: dict, group: dict, *, reason: str = "下单监控已结案或交易所无同向持仓") -> None:
gid = int(group["id"])
_cancel_pending_legs_for_group(conn, cfg, group, status="cancelled")
cur = conn.execute(
"UPDATE roll_groups SET status='closed', updated_at=? WHERE id=? AND status='active'",
(_now(cfg), gid),
)
if getattr(cur, "rowcount", 0):
try:
from strategy_wechat_notify import notify_roll_group_ended
notify_roll_group_ended(
cfg,
group_id=gid,
symbol=group.get("symbol") or "",
direction=group.get("direction") or "long",
reason=reason,
leg_count=int(group.get("leg_count") or 0),
)
except Exception:
pass
try:
from strategy_snapshot_lib import save_roll_group_snapshot
save_roll_group_snapshot(cfg, conn, group, result_label="结束")
except Exception:
pass
def _reconcile_roll_groups(conn, cfg: dict) -> None:
rows = conn.execute(
"""SELECT g.*, m.status AS monitor_status
FROM roll_groups g
LEFT JOIN order_monitors m ON m.id = g.order_monitor_id
WHERE g.status='active'"""
).fetchall()
for row in rows:
g = _row_dict(row)
symbol = g.get("symbol") or ""
direction = (g.get("direction") or "long").strip().lower()
ex_sym = g.get("exchange_symbol") or cfg["normalize_exchange_symbol"](symbol)
mon_ok = (row["monitor_status"] or "").strip().lower() == "active"
pos = cfg["get_position"](ex_sym, direction)
qty = float(pos.get("contracts") or 0)
if not mon_ok or qty <= 0:
_close_roll_group(conn, cfg, g)
def _cancel_roll_leg_order(cfg: dict, group: dict, leg: dict) -> None:
oid = (leg.get("exchange_order_id") or "").strip()
if not oid:
return
symbol = group.get("symbol") or ""
ex_sym = group.get("exchange_symbol") or cfg["normalize_exchange_symbol"](symbol)
cancel = cfg.get("cancel_limit_order")
if callable(cancel):
try:
cancel(ex_sym, oid)
except Exception:
pass
def _contract_size(cfg: dict, ex_sym: str) -> float:
get_cs = cfg.get("get_contract_size")
if callable(get_cs):
try:
return float(get_cs(ex_sym) or 1.0)
except Exception:
pass
return 1.0
def _resolve_add_mode(leg: dict) -> str:
raw = (leg.get("add_mode") or "").strip().lower()
if raw in (MARKET_MODE, "market", "市价", "市价加仓"):
return MARKET_MODE
if "786" in raw or raw == "fib_786":
return "fib_786"
if "618" in raw or raw == "fib_618":
return "fib_618"
if raw in (BREAKOUT_MODE, "突破", "突破加仓"):
return BREAKOUT_MODE
if raw.startswith("fib"):
return raw.replace(".", "_").replace("0.", "0")
return raw or MARKET_MODE
def _check_pending_roll_legs(conn, cfg: dict) -> None:
rows = conn.execute(
"""SELECT l.*, g.symbol, g.exchange_symbol, g.direction, g.initial_take_profit,
g.order_monitor_id, g.risk_percent, g.leg_count
FROM roll_legs l
INNER JOIN roll_groups g ON g.id = l.roll_group_id AND g.status='active'
WHERE l.status='pending'"""
).fetchall()
for row in rows:
leg = _row_dict(row)
group = {
"id": leg["roll_group_id"],
"symbol": leg["symbol"],
"exchange_symbol": leg["exchange_symbol"],
"direction": leg["direction"],
"initial_take_profit": leg["initial_take_profit"],
"order_monitor_id": leg["order_monitor_id"],
"risk_percent": leg.get("risk_percent"),
"leg_count": leg.get("leg_count"),
}
_process_pending_roll_leg(conn, cfg, group, leg)
def _process_pending_roll_leg(conn, cfg: dict, group: dict, leg: dict) -> None:
symbol = group.get("symbol") or ""
direction = (group.get("direction") or "long").strip().lower()
ex_sym = group.get("exchange_symbol") or cfg["normalize_exchange_symbol"](symbol)
mark_fn = cfg.get("get_mark_price") or cfg.get("get_price")
mark = mark_fn(symbol) if callable(mark_fn) else None
if mark is None:
return
mark_f = float(mark)
prev_mark = leg.get("last_mark_price")
try:
prev_f = float(prev_mark) if prev_mark not in (None, "") else None
except (TypeError, ValueError):
prev_f = None
mode = _resolve_add_mode(leg)
sl = float(leg.get("new_stop_loss") or 0)
fib_u, fib_l = leg.get("fib_upper"), leg.get("fib_lower")
bp = leg.get("breakthrough_price")
if mode in FIB_MODES and fib_u is not None and fib_l is not None:
if roll_fib_invalidate(direction, mark_f, float(fib_u), float(fib_l)):
_invalidate_roll_leg(conn, cfg, group, leg, mark_f, reason="止盈侧突破")
return
elif mode == BREAKOUT_MODE and sl > 0:
if roll_breakout_invalidate(direction, mark_f, sl):
_invalidate_roll_leg(conn, cfg, group, leg, mark_f, reason="止损侧突破")
return
triggered = False
if mode in FIB_MODES:
lp = leg.get("limit_price")
if lp is not None and roll_fib_trigger_crossed(direction, prev_f, mark_f, float(lp)):
triggered = True
elif mode == BREAKOUT_MODE and bp is not None:
if roll_breakout_trigger_crossed(direction, prev_f, mark_f, float(bp)):
triggered = True
conn.execute(
"UPDATE roll_legs SET last_mark_price=? WHERE id=? AND status='pending'",
(mark_f, int(leg["id"])),
)
if triggered:
_execute_pending_roll_leg(conn, cfg, group, leg, ex_sym, direction, mark_f)
return
def _execute_pending_roll_leg(
conn,
cfg: dict,
group: dict,
leg: dict,
ex_sym: str,
direction: str,
mark: float,
) -> None:
leg_id = int(leg["id"])
gid = int(group["roll_group_id"]) if "roll_group_id" in leg else int(group["id"])
mon_id = group.get("order_monitor_id")
mon = None
if mon_id:
row = conn.execute("SELECT * FROM order_monitors WHERE id=?", (mon_id,)).fetchone()
mon = _row_dict(row) if row else None
if not mon or (mon.get("status") or "").strip().lower() != "active":
_invalidate_roll_leg(conn, cfg, group, leg, mark, reason="监控单已失效")
return
pos = cfg["get_position"](ex_sym, direction) or {}
qty = float(pos.get("contracts") or 0)
entry = float(pos.get("entry_price") or mon.get("trigger_price") or 0)
if qty <= 0 or entry <= 0:
_invalidate_roll_leg(conn, cfg, group, leg, mark, reason="无持仓")
return
filled = count_filled_roll_legs(conn, gid)
if filled >= max_roll_legs(direction):
_invalidate_roll_leg(conn, cfg, group, leg, mark, reason="滚仓次数已满")
return
try:
risk_pct = float(mon.get("risk_percent") or group.get("risk_percent") or 2)
except (TypeError, ValueError):
risk_pct = 2.0
conn_cap = cfg["get_db"]()
try:
capital = float(cfg["get_trading_capital_usdt"](conn_cap))
finally:
conn_cap.close()
cs = _contract_size(cfg, ex_sym)
sl = float(leg.get("new_stop_loss") or 0)
tp0 = float(group.get("initial_take_profit") or mon.get("take_profit") or 0)
mode = _resolve_add_mode(leg)
q2_raw, err = solve_add_amount_for_total_risk(
direction, qty, entry, mark, sl, calc_risk_budget_usdt(capital, risk_pct), cs
)
if err or q2_raw is None or float(q2_raw) <= 0:
_invalidate_roll_leg(conn, cfg, group, leg, mark, reason=err or "无法计算加仓张数")
return
amount = cfg["amount_to_precision"](ex_sym, float(q2_raw))
if amount is None or float(amount) <= 0:
_invalidate_roll_leg(conn, cfg, group, leg, mark, reason="加仓张数低于交易所最小精度")
return
lev_fn = cfg.get("default_leverage")
if not callable(lev_fn):
lev_fn = lambda _s: 5
leverage = int(lev_fn(group.get("symbol") or ""))
try:
order = cfg["market_add"](ex_sym, direction, float(amount), leverage)
fill = float(
cfg.get("resolve_fill_price", lambda o, s, p: p)(order, ex_sym, mark) or mark
)
except Exception as e:
fe = cfg.get("friendly_error")
msg = fe(e) if callable(fe) else str(e)
_notify_roll_fail(cfg, group, leg, mark, msg)
return
oid = str(order.get("id") or "") if isinstance(order, dict) else ""
cfg["replace_tpsl"](ex_sym, direction, sl, tp0, mon)
conn.execute(
"""UPDATE roll_legs SET status='filled', fill_price=?, amount=?, exchange_order_id=?,
new_stop_loss=? WHERE id=? AND status='pending'""",
(fill, float(amount), oid, sl, leg_id),
)
conn.execute(
"UPDATE roll_groups SET leg_count=?, current_stop_loss=?, updated_at=? WHERE id=?",
(filled + 1, sl, _now(cfg), gid),
)
conn.execute(
"UPDATE order_monitors SET stop_loss=? WHERE id=? AND status='active'",
(sl, mon["id"]),
)
notify = cfg.get("send_wechat")
if callable(notify):
sym = group.get("symbol") or ""
mode_lbl = leg.get("add_mode") or mode_label(mode)
fmt = cfg.get("format_price")
px_txt = fmt(sym, fill) if callable(fmt) else str(fill)
sl_txt = fmt(sym, sl) if callable(fmt) else str(sl)
acct = _wechat_account(cfg)
dir_txt = _wechat_dir(cfg, direction)
notify(
f"# ✅ {sym} 滚仓触价成交\n"
f"**账户:{acct}**\n"
f"- 方式:{mode_lbl}{dir_txt}\n"
f"- 成交价:{px_txt}|张数:{amount}\n"
f"- 新止损:{sl_txt}(止盈仍为首仓)\n"
)
def _invalidate_roll_leg(
conn,
cfg: dict,
group: dict,
leg: dict,
mark: float,
*,
reason: str = "",
) -> None:
leg_id = int(leg["id"])
cur = conn.execute("SELECT status FROM roll_legs WHERE id=?", (leg_id,)).fetchone()
if not cur or (cur[0] or "").strip().lower() in ("invalidated", "filled", "cancelled"):
return
_cancel_roll_leg_order(cfg, group, leg)
conn.execute(
"UPDATE roll_legs SET status='invalidated' WHERE id=? AND status='pending'",
(leg_id,),
)
_send_roll_invalidate_wechat(cfg, group, leg, mark, reason=reason)
def _notify_roll_fail(cfg: dict, group: dict, leg: dict, mark: float, reason: str) -> None:
notify = cfg.get("send_wechat")
if not callable(notify):
return
sym = group.get("symbol") or ""
mode = leg.get("add_mode") or "滚仓"
acct = _wechat_account(cfg)
notify(
f"# ❌ {sym} 滚仓触价成交失败\n"
f"**账户:{acct}**\n"
f"- 方式:{mode}\n"
f"- 原因:{reason}\n"
)
def _send_roll_invalidate_wechat(
cfg: dict, group: dict, leg: dict, mark: float, *, reason: str = ""
) -> None:
notify = cfg.get("send_wechat")
if not callable(notify):
return
sym = group.get("symbol") or ""
direction = (group.get("direction") or "long").strip().lower()
mode = leg.get("add_mode") or "滚仓监控"
fmt = cfg.get("format_price")
mark_txt = fmt(sym, mark) if callable(fmt) else str(mark)
acct = _wechat_account(cfg)
dir_txt = _wechat_dir(cfg, direction)
detail = reason or "条件不满足"
notify(
f"# ⚠️ {sym} 滚仓监控失效\n"
f"**账户:{acct}**\n"
f"- 方式:{mode}{dir_txt}\n"
f"- 标记价 {mark_txt}{detail}\n"
f"- 本条监控已结案,可重新提交\n"
)
def _wechat_account(cfg: dict) -> str:
fn = cfg.get("wechat_account_label")
if callable(fn):
try:
return str(fn())
except Exception:
pass
return str(cfg.get("exchange_display") or "")
def _wechat_dir(cfg: dict, direction: str) -> str:
fn = cfg.get("wechat_direction_text")
if callable(fn):
try:
return str(fn(direction))
except Exception:
pass
return "做多" if (direction or "long").strip().lower() == "long" else "做空"