d467760d5c
重写滚仓计仓与四种加仓方式(市价/斐波/突破),程序盯 mark 触价成交;风险读监控单;pending 可删不可改;手动平仓同步结束滚仓。新增 /strategy/roll/docs 说明页与顺势加仓滚仓说明.md。 Co-authored-by: Cursor <cursoragent@cursor.com>
521 lines
17 KiB
Python
521 lines
17 KiB
Python
"""滚仓程序监控:斐波/突破触价市价成交、失效、外部平仓同步(各所共用)。"""
|
||
from __future__ import annotations
|
||
|
||
from typing import Any, Optional
|
||
|
||
from strategy_roll_lib import (
|
||
BREAKOUT_MODE,
|
||
FIB_MODES,
|
||
MARKET_MODE,
|
||
mode_label,
|
||
roll_breakout_invalidate,
|
||
roll_breakout_trigger_crossed,
|
||
roll_fib_invalidate,
|
||
roll_fib_trigger_crossed,
|
||
calc_risk_budget_usdt,
|
||
max_roll_legs,
|
||
preview_roll,
|
||
solve_add_amount_for_total_risk,
|
||
)
|
||
from strategy_db import init_strategy_tables
|
||
|
||
ROLL_LEG_STATUS_LABELS = {
|
||
"pending": "监控中",
|
||
"filled": "已成交",
|
||
"cancelled": "已删除",
|
||
"invalidated": "已失效",
|
||
}
|
||
|
||
|
||
def roll_leg_status_label(status: Optional[str]) -> str:
|
||
s = (status or "").strip().lower()
|
||
return ROLL_LEG_STATUS_LABELS.get(s, status or "—")
|
||
|
||
|
||
def check_roll_monitors(cfg: dict[str, Any]) -> None:
|
||
get_db = cfg["get_db"]
|
||
conn = get_db()
|
||
try:
|
||
init_strategy_tables(conn)
|
||
_reconcile_roll_groups(conn, cfg)
|
||
_check_pending_roll_legs(conn, cfg)
|
||
conn.commit()
|
||
except Exception:
|
||
try:
|
||
conn.rollback()
|
||
except Exception:
|
||
pass
|
||
finally:
|
||
try:
|
||
conn.close()
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def sync_roll_after_external_close(
|
||
cfg: dict, conn, symbol: str, direction: str, *, reason: str = "持仓已平"
|
||
) -> dict[str, Any]:
|
||
"""中控/实例手动平仓后:取消 pending 腿并关闭 active 滚仓组(保留 filled 历史)。"""
|
||
norm = cfg.get("normalize_symbol_input")
|
||
sym = norm(symbol) if callable(norm) else (symbol or "").strip()
|
||
if not sym:
|
||
return {"ok": False, "msg": "symbol 无效", "closed_groups": 0, "cancelled_legs": 0}
|
||
direction = (direction or "long").strip().lower()
|
||
init_strategy_tables(conn)
|
||
rows = conn.execute(
|
||
"""SELECT g.* FROM roll_groups g
|
||
WHERE g.status='active' AND g.symbol=? AND g.direction=?""",
|
||
(sym, direction),
|
||
).fetchall()
|
||
closed = cancelled = 0
|
||
for row in rows:
|
||
g = _row_dict(row)
|
||
cancelled += _cancel_pending_legs_for_group(conn, cfg, g, status="cancelled")
|
||
cur = conn.execute(
|
||
"UPDATE roll_groups SET status='closed', updated_at=? WHERE id=? AND status='active'",
|
||
(_now(cfg), int(g["id"])),
|
||
)
|
||
if getattr(cur, "rowcount", 0):
|
||
closed += 1
|
||
try:
|
||
from strategy_wechat_notify import notify_roll_group_ended
|
||
|
||
notify_roll_group_ended(
|
||
cfg,
|
||
group_id=int(g["id"]),
|
||
symbol=sym,
|
||
direction=direction,
|
||
reason=reason,
|
||
leg_count=int(g.get("leg_count") or 0),
|
||
)
|
||
except Exception:
|
||
pass
|
||
try:
|
||
from strategy_snapshot_lib import save_roll_group_snapshot
|
||
|
||
save_roll_group_snapshot(cfg, conn, g, result_label="结束")
|
||
except Exception:
|
||
pass
|
||
return {
|
||
"ok": True,
|
||
"symbol": sym,
|
||
"direction": direction,
|
||
"closed_groups": closed,
|
||
"cancelled_legs": cancelled,
|
||
}
|
||
|
||
|
||
def cancel_roll_pending_leg(cfg: dict, conn, leg_id: int) -> tuple[bool, str]:
|
||
"""用户删除 pending 滚仓腿(不可修改,仅删除)。"""
|
||
init_strategy_tables(conn)
|
||
row = conn.execute(
|
||
"SELECT l.*, g.symbol, g.direction, g.status AS group_status FROM roll_legs l "
|
||
"INNER JOIN roll_groups g ON g.id = l.roll_group_id WHERE l.id=?",
|
||
(int(leg_id),),
|
||
).fetchone()
|
||
if not row:
|
||
return False, "滚仓腿不存在"
|
||
leg = _row_dict(row)
|
||
if (leg.get("status") or "").strip().lower() != "pending":
|
||
return False, "仅监控中的腿可删除"
|
||
_cancel_roll_leg_order(cfg, {"symbol": leg.get("symbol"), "exchange_symbol": leg.get("exchange_symbol")}, leg)
|
||
conn.execute(
|
||
"UPDATE roll_legs SET status='cancelled' WHERE id=? AND status='pending'",
|
||
(int(leg_id),),
|
||
)
|
||
conn.commit()
|
||
return True, "已删除滚仓监控"
|
||
|
||
|
||
def count_filled_roll_legs(conn, roll_group_id: int) -> int:
|
||
row = conn.execute(
|
||
"SELECT COUNT(*) FROM roll_legs WHERE roll_group_id=? AND status='filled'",
|
||
(int(roll_group_id),),
|
||
).fetchone()
|
||
return int(row[0] if row else 0)
|
||
|
||
|
||
def count_pending_roll_legs(conn, roll_group_id: int) -> int:
|
||
row = conn.execute(
|
||
"SELECT COUNT(*) FROM roll_legs WHERE roll_group_id=? AND status='pending'",
|
||
(int(roll_group_id),),
|
||
).fetchone()
|
||
return int(row[0] if row else 0)
|
||
|
||
|
||
def _row_dict(row) -> dict:
|
||
if row is None:
|
||
return {}
|
||
try:
|
||
return dict(row)
|
||
except Exception:
|
||
return {}
|
||
|
||
|
||
def _now(cfg: dict) -> str:
|
||
fn = cfg.get("app_now_str")
|
||
return fn() if callable(fn) else ""
|
||
|
||
|
||
def _cancel_pending_legs_for_group(conn, cfg: dict, group: dict, *, status: str = "cancelled") -> int:
|
||
gid = int(group["id"])
|
||
n = 0
|
||
for leg in conn.execute(
|
||
"SELECT * FROM roll_legs WHERE roll_group_id=? AND status='pending'",
|
||
(gid,),
|
||
).fetchall():
|
||
ld = _row_dict(leg)
|
||
_cancel_roll_leg_order(cfg, group, ld)
|
||
conn.execute(
|
||
"UPDATE roll_legs SET status=? WHERE id=? AND status='pending'",
|
||
(status, ld["id"]),
|
||
)
|
||
n += 1
|
||
return n
|
||
|
||
|
||
def _close_roll_group(conn, cfg: dict, group: dict, *, reason: str = "下单监控已结案或交易所无同向持仓") -> None:
|
||
gid = int(group["id"])
|
||
_cancel_pending_legs_for_group(conn, cfg, group, status="cancelled")
|
||
cur = conn.execute(
|
||
"UPDATE roll_groups SET status='closed', updated_at=? WHERE id=? AND status='active'",
|
||
(_now(cfg), gid),
|
||
)
|
||
if getattr(cur, "rowcount", 0):
|
||
try:
|
||
from strategy_wechat_notify import notify_roll_group_ended
|
||
|
||
notify_roll_group_ended(
|
||
cfg,
|
||
group_id=gid,
|
||
symbol=group.get("symbol") or "",
|
||
direction=group.get("direction") or "long",
|
||
reason=reason,
|
||
leg_count=int(group.get("leg_count") or 0),
|
||
)
|
||
except Exception:
|
||
pass
|
||
try:
|
||
from strategy_snapshot_lib import save_roll_group_snapshot
|
||
|
||
save_roll_group_snapshot(cfg, conn, group, result_label="结束")
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _reconcile_roll_groups(conn, cfg: dict) -> None:
|
||
rows = conn.execute(
|
||
"""SELECT g.*, m.status AS monitor_status
|
||
FROM roll_groups g
|
||
LEFT JOIN order_monitors m ON m.id = g.order_monitor_id
|
||
WHERE g.status='active'"""
|
||
).fetchall()
|
||
for row in rows:
|
||
g = _row_dict(row)
|
||
symbol = g.get("symbol") or ""
|
||
direction = (g.get("direction") or "long").strip().lower()
|
||
ex_sym = g.get("exchange_symbol") or cfg["normalize_exchange_symbol"](symbol)
|
||
mon_ok = (row["monitor_status"] or "").strip().lower() == "active"
|
||
pos = cfg["get_position"](ex_sym, direction)
|
||
qty = float(pos.get("contracts") or 0)
|
||
if not mon_ok or qty <= 0:
|
||
_close_roll_group(conn, cfg, g)
|
||
|
||
|
||
def _cancel_roll_leg_order(cfg: dict, group: dict, leg: dict) -> None:
|
||
oid = (leg.get("exchange_order_id") or "").strip()
|
||
if not oid:
|
||
return
|
||
symbol = group.get("symbol") or ""
|
||
ex_sym = group.get("exchange_symbol") or cfg["normalize_exchange_symbol"](symbol)
|
||
cancel = cfg.get("cancel_limit_order")
|
||
if callable(cancel):
|
||
try:
|
||
cancel(ex_sym, oid)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _contract_size(cfg: dict, ex_sym: str) -> float:
|
||
get_cs = cfg.get("get_contract_size")
|
||
if callable(get_cs):
|
||
try:
|
||
return float(get_cs(ex_sym) or 1.0)
|
||
except Exception:
|
||
pass
|
||
return 1.0
|
||
|
||
|
||
def _resolve_add_mode(leg: dict) -> str:
|
||
raw = (leg.get("add_mode") or "").strip().lower()
|
||
if raw in (MARKET_MODE, "market", "市价", "市价加仓"):
|
||
return MARKET_MODE
|
||
if "786" in raw or raw == "fib_786":
|
||
return "fib_786"
|
||
if "618" in raw or raw == "fib_618":
|
||
return "fib_618"
|
||
if raw in (BREAKOUT_MODE, "突破", "突破加仓"):
|
||
return BREAKOUT_MODE
|
||
if raw.startswith("fib"):
|
||
return raw.replace(".", "_").replace("0.", "0")
|
||
return raw or MARKET_MODE
|
||
|
||
|
||
def _check_pending_roll_legs(conn, cfg: dict) -> None:
|
||
rows = conn.execute(
|
||
"""SELECT l.*, g.symbol, g.exchange_symbol, g.direction, g.initial_take_profit,
|
||
g.order_monitor_id, g.risk_percent, g.leg_count
|
||
FROM roll_legs l
|
||
INNER JOIN roll_groups g ON g.id = l.roll_group_id AND g.status='active'
|
||
WHERE l.status='pending'"""
|
||
).fetchall()
|
||
for row in rows:
|
||
leg = _row_dict(row)
|
||
group = {
|
||
"id": leg["roll_group_id"],
|
||
"symbol": leg["symbol"],
|
||
"exchange_symbol": leg["exchange_symbol"],
|
||
"direction": leg["direction"],
|
||
"initial_take_profit": leg["initial_take_profit"],
|
||
"order_monitor_id": leg["order_monitor_id"],
|
||
"risk_percent": leg.get("risk_percent"),
|
||
"leg_count": leg.get("leg_count"),
|
||
}
|
||
_process_pending_roll_leg(conn, cfg, group, leg)
|
||
|
||
|
||
def _process_pending_roll_leg(conn, cfg: dict, group: dict, leg: dict) -> None:
|
||
symbol = group.get("symbol") or ""
|
||
direction = (group.get("direction") or "long").strip().lower()
|
||
ex_sym = group.get("exchange_symbol") or cfg["normalize_exchange_symbol"](symbol)
|
||
mark_fn = cfg.get("get_mark_price") or cfg.get("get_price")
|
||
mark = mark_fn(symbol) if callable(mark_fn) else None
|
||
if mark is None:
|
||
return
|
||
mark_f = float(mark)
|
||
prev_mark = leg.get("last_mark_price")
|
||
try:
|
||
prev_f = float(prev_mark) if prev_mark not in (None, "") else None
|
||
except (TypeError, ValueError):
|
||
prev_f = None
|
||
|
||
mode = _resolve_add_mode(leg)
|
||
sl = float(leg.get("new_stop_loss") or 0)
|
||
fib_u, fib_l = leg.get("fib_upper"), leg.get("fib_lower")
|
||
bp = leg.get("breakthrough_price")
|
||
|
||
if mode in FIB_MODES and fib_u is not None and fib_l is not None:
|
||
if roll_fib_invalidate(direction, mark_f, float(fib_u), float(fib_l)):
|
||
_invalidate_roll_leg(conn, cfg, group, leg, mark_f, reason="止盈侧突破")
|
||
return
|
||
elif mode == BREAKOUT_MODE and sl > 0:
|
||
if roll_breakout_invalidate(direction, mark_f, sl):
|
||
_invalidate_roll_leg(conn, cfg, group, leg, mark_f, reason="止损侧突破")
|
||
return
|
||
|
||
triggered = False
|
||
if mode in FIB_MODES:
|
||
lp = leg.get("limit_price")
|
||
if lp is not None and roll_fib_trigger_crossed(direction, prev_f, mark_f, float(lp)):
|
||
triggered = True
|
||
elif mode == BREAKOUT_MODE and bp is not None:
|
||
if roll_breakout_trigger_crossed(direction, prev_f, mark_f, float(bp)):
|
||
triggered = True
|
||
|
||
conn.execute(
|
||
"UPDATE roll_legs SET last_mark_price=? WHERE id=? AND status='pending'",
|
||
(mark_f, int(leg["id"])),
|
||
)
|
||
|
||
if triggered:
|
||
_execute_pending_roll_leg(conn, cfg, group, leg, ex_sym, direction, mark_f)
|
||
return
|
||
|
||
|
||
def _execute_pending_roll_leg(
|
||
conn,
|
||
cfg: dict,
|
||
group: dict,
|
||
leg: dict,
|
||
ex_sym: str,
|
||
direction: str,
|
||
mark: float,
|
||
) -> None:
|
||
leg_id = int(leg["id"])
|
||
gid = int(group["roll_group_id"]) if "roll_group_id" in leg else int(group["id"])
|
||
mon_id = group.get("order_monitor_id")
|
||
mon = None
|
||
if mon_id:
|
||
row = conn.execute("SELECT * FROM order_monitors WHERE id=?", (mon_id,)).fetchone()
|
||
mon = _row_dict(row) if row else None
|
||
if not mon or (mon.get("status") or "").strip().lower() != "active":
|
||
_invalidate_roll_leg(conn, cfg, group, leg, mark, reason="监控单已失效")
|
||
return
|
||
|
||
pos = cfg["get_position"](ex_sym, direction) or {}
|
||
qty = float(pos.get("contracts") or 0)
|
||
entry = float(pos.get("entry_price") or mon.get("trigger_price") or 0)
|
||
if qty <= 0 or entry <= 0:
|
||
_invalidate_roll_leg(conn, cfg, group, leg, mark, reason="无持仓")
|
||
return
|
||
|
||
filled = count_filled_roll_legs(conn, gid)
|
||
if filled >= max_roll_legs(direction):
|
||
_invalidate_roll_leg(conn, cfg, group, leg, mark, reason="滚仓次数已满")
|
||
return
|
||
|
||
try:
|
||
risk_pct = float(mon.get("risk_percent") or group.get("risk_percent") or 2)
|
||
except (TypeError, ValueError):
|
||
risk_pct = 2.0
|
||
conn_cap = cfg["get_db"]()
|
||
try:
|
||
capital = float(cfg["get_trading_capital_usdt"](conn_cap))
|
||
finally:
|
||
conn_cap.close()
|
||
|
||
cs = _contract_size(cfg, ex_sym)
|
||
sl = float(leg.get("new_stop_loss") or 0)
|
||
tp0 = float(group.get("initial_take_profit") or mon.get("take_profit") or 0)
|
||
mode = _resolve_add_mode(leg)
|
||
|
||
q2_raw, err = solve_add_amount_for_total_risk(
|
||
direction, qty, entry, mark, sl, calc_risk_budget_usdt(capital, risk_pct), cs
|
||
)
|
||
if err or q2_raw is None or float(q2_raw) <= 0:
|
||
_invalidate_roll_leg(conn, cfg, group, leg, mark, reason=err or "无法计算加仓张数")
|
||
return
|
||
|
||
amount = cfg["amount_to_precision"](ex_sym, float(q2_raw))
|
||
if amount is None or float(amount) <= 0:
|
||
_invalidate_roll_leg(conn, cfg, group, leg, mark, reason="加仓张数低于交易所最小精度")
|
||
return
|
||
|
||
lev_fn = cfg.get("default_leverage")
|
||
if not callable(lev_fn):
|
||
lev_fn = lambda _s: 5
|
||
leverage = int(lev_fn(group.get("symbol") or ""))
|
||
|
||
try:
|
||
order = cfg["market_add"](ex_sym, direction, float(amount), leverage)
|
||
fill = float(
|
||
cfg.get("resolve_fill_price", lambda o, s, p: p)(order, ex_sym, mark) or mark
|
||
)
|
||
except Exception as e:
|
||
fe = cfg.get("friendly_error")
|
||
msg = fe(e) if callable(fe) else str(e)
|
||
_notify_roll_fail(cfg, group, leg, mark, msg)
|
||
return
|
||
|
||
oid = str(order.get("id") or "") if isinstance(order, dict) else ""
|
||
cfg["replace_tpsl"](ex_sym, direction, sl, tp0, mon)
|
||
conn.execute(
|
||
"""UPDATE roll_legs SET status='filled', fill_price=?, amount=?, exchange_order_id=?,
|
||
new_stop_loss=? WHERE id=? AND status='pending'""",
|
||
(fill, float(amount), oid, sl, leg_id),
|
||
)
|
||
conn.execute(
|
||
"UPDATE roll_groups SET leg_count=?, current_stop_loss=?, updated_at=? WHERE id=?",
|
||
(filled + 1, sl, _now(cfg), gid),
|
||
)
|
||
conn.execute(
|
||
"UPDATE order_monitors SET stop_loss=? WHERE id=? AND status='active'",
|
||
(sl, mon["id"]),
|
||
)
|
||
|
||
notify = cfg.get("send_wechat")
|
||
if callable(notify):
|
||
sym = group.get("symbol") or ""
|
||
mode_lbl = leg.get("add_mode") or mode_label(mode)
|
||
fmt = cfg.get("format_price")
|
||
px_txt = fmt(sym, fill) if callable(fmt) else str(fill)
|
||
sl_txt = fmt(sym, sl) if callable(fmt) else str(sl)
|
||
acct = _wechat_account(cfg)
|
||
dir_txt = _wechat_dir(cfg, direction)
|
||
notify(
|
||
f"# ✅ {sym} 滚仓触价成交\n"
|
||
f"**账户:{acct}**\n"
|
||
f"- 方式:{mode_lbl}|{dir_txt}\n"
|
||
f"- 成交价:{px_txt}|张数:{amount}\n"
|
||
f"- 新止损:{sl_txt}(止盈仍为首仓)\n"
|
||
)
|
||
|
||
|
||
def _invalidate_roll_leg(
|
||
conn,
|
||
cfg: dict,
|
||
group: dict,
|
||
leg: dict,
|
||
mark: float,
|
||
*,
|
||
reason: str = "",
|
||
) -> None:
|
||
leg_id = int(leg["id"])
|
||
cur = conn.execute("SELECT status FROM roll_legs WHERE id=?", (leg_id,)).fetchone()
|
||
if not cur or (cur[0] or "").strip().lower() in ("invalidated", "filled", "cancelled"):
|
||
return
|
||
_cancel_roll_leg_order(cfg, group, leg)
|
||
conn.execute(
|
||
"UPDATE roll_legs SET status='invalidated' WHERE id=? AND status='pending'",
|
||
(leg_id,),
|
||
)
|
||
_send_roll_invalidate_wechat(cfg, group, leg, mark, reason=reason)
|
||
|
||
|
||
def _notify_roll_fail(cfg: dict, group: dict, leg: dict, mark: float, reason: str) -> None:
|
||
notify = cfg.get("send_wechat")
|
||
if not callable(notify):
|
||
return
|
||
sym = group.get("symbol") or ""
|
||
mode = leg.get("add_mode") or "滚仓"
|
||
acct = _wechat_account(cfg)
|
||
notify(
|
||
f"# ❌ {sym} 滚仓触价成交失败\n"
|
||
f"**账户:{acct}**\n"
|
||
f"- 方式:{mode}\n"
|
||
f"- 原因:{reason}\n"
|
||
)
|
||
|
||
|
||
def _send_roll_invalidate_wechat(
|
||
cfg: dict, group: dict, leg: dict, mark: float, *, reason: str = ""
|
||
) -> None:
|
||
notify = cfg.get("send_wechat")
|
||
if not callable(notify):
|
||
return
|
||
sym = group.get("symbol") or ""
|
||
direction = (group.get("direction") or "long").strip().lower()
|
||
mode = leg.get("add_mode") or "滚仓监控"
|
||
fmt = cfg.get("format_price")
|
||
mark_txt = fmt(sym, mark) if callable(fmt) else str(mark)
|
||
acct = _wechat_account(cfg)
|
||
dir_txt = _wechat_dir(cfg, direction)
|
||
detail = reason or "条件不满足"
|
||
notify(
|
||
f"# ⚠️ {sym} 滚仓监控失效\n"
|
||
f"**账户:{acct}**\n"
|
||
f"- 方式:{mode}|{dir_txt}\n"
|
||
f"- 标记价 {mark_txt}|{detail}\n"
|
||
f"- 本条监控已结案,可重新提交\n"
|
||
)
|
||
|
||
|
||
def _wechat_account(cfg: dict) -> str:
|
||
fn = cfg.get("wechat_account_label")
|
||
if callable(fn):
|
||
try:
|
||
return str(fn())
|
||
except Exception:
|
||
pass
|
||
return str(cfg.get("exchange_display") or "")
|
||
|
||
|
||
def _wechat_dir(cfg: dict, direction: str) -> str:
|
||
fn = cfg.get("wechat_direction_text")
|
||
if callable(fn):
|
||
try:
|
||
return str(fn(direction))
|
||
except Exception:
|
||
pass
|
||
return "做多" if (direction or "long").strip().lower() == "long" else "做空"
|