Add futures roll strategy with breakout monitoring and fixed-amount sizing.

Replace percent-based risk with system fixed amount, support market/breakout add modes only, allow pending submission outside trading hours, and fix short breakout geometry plus route registration.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-29 12:05:21 +08:00
parent 7ce59d2d71
commit 44bec23296
8 changed files with 982 additions and 160 deletions
+15
View File
@@ -122,6 +122,16 @@ CREATE TABLE IF NOT EXISTS ctp_sim_positions (
"""
ROLL_LEG_EXTRA_COLUMNS = (
"ALTER TABLE roll_legs ADD COLUMN limit_price REAL",
"ALTER TABLE roll_legs ADD COLUMN breakthrough_price REAL",
"ALTER TABLE roll_legs ADD COLUMN last_mark_price REAL",
"ALTER TABLE roll_legs ADD COLUMN invalidated_reason TEXT",
"ALTER TABLE roll_legs ADD COLUMN capital_snapshot REAL",
"ALTER TABLE trade_order_monitors ADD COLUMN risk_percent REAL",
)
_TABLES_READY = False
@@ -143,6 +153,11 @@ def init_strategy_tables(conn) -> None:
conn.execute("ALTER TABLE trend_pullback_plans ADD COLUMN period TEXT DEFAULT '15m'")
except Exception:
pass
for sql in ROLL_LEG_EXTRA_COLUMNS:
try:
conn.execute(sql)
except Exception:
pass
if not conn.execute("SELECT id FROM ctp_sim_account WHERE id=1").fetchone():
conn.execute("INSERT INTO ctp_sim_account (id, balance, available) VALUES (1, 100000, 100000)")
conn.commit()
+220 -25
View File
@@ -3,25 +3,49 @@
# 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。
# 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md
"""顺势加仓(滚仓):纯计算,期货版(手数整数、乘数计入盈亏)。"""
"""顺势加仓(滚仓):纯计算与校验,期货版(手数整数、乘数计入盈亏)。"""
from __future__ import annotations
import math
from typing import Any, Optional, Tuple
from position_sizing import MODE_AMOUNT
from strategy.fib_lib import calc_fib_plan
ROLL_MAX_LEGS_LONG = 3
ROLL_MAX_LEGS_SHORT = 3
ROLL_STOP_OFFSET_PCT_DEFAULT = 1.0
FIB_MODES = frozenset({"fib_618", "fib_786"})
ADD_MODE_MARKET = "market"
ADD_MODE_FIB_618 = "fib_618"
ADD_MODE_FIB_786 = "fib_786"
ADD_MODE_BREAKOUT = "breakout"
FIB_MODES = frozenset({ADD_MODE_FIB_618, ADD_MODE_FIB_786})
PENDING_MODES = frozenset({ADD_MODE_FIB_618, ADD_MODE_FIB_786, ADD_MODE_BREAKOUT})
ADD_MODE_LABELS = {
ADD_MODE_MARKET: "市价加仓",
ADD_MODE_FIB_618: "斐波0.618",
ADD_MODE_FIB_786: "斐波0.786",
ADD_MODE_BREAKOUT: "突破加仓",
}
LEG_STATUS_PENDING = "pending"
LEG_STATUS_FILLED = "filled"
LEG_STATUS_CANCELLED = "cancelled"
LEG_STATUS_INVALIDATED = "invalidated"
def add_mode_label(mode: str) -> str:
return ADD_MODE_LABELS.get((mode or "").strip().lower(), mode or "")
def fib_ratio_from_mode(mode: str) -> Optional[float]:
m = (mode or "").strip().lower()
if m in ("fib_618", "618", "0.618"):
if m in (ADD_MODE_FIB_618, "618", "0.618"):
return 0.618
if m in ("fib_786", "786", "0.786"):
if m in (ADD_MODE_FIB_786, "786", "0.786"):
return 0.786
return None
@@ -75,6 +99,7 @@ def solve_add_lots_for_total_risk(
risk_budget: float,
mult: int,
) -> Tuple[Optional[int], Optional[str]]:
"""方案 C:合并持仓打到新止损 S 时总亏损 ≤ B。"""
q1, e1, e2, sl, b = float(qty_existing), float(entry_existing), float(add_price), float(new_stop), float(risk_budget)
m = float(mult)
direction = (direction or "long").strip().lower()
@@ -89,10 +114,142 @@ def solve_add_lots_for_total_risk(
q2 = numer / denom
lots = lots_precise(q2)
if lots < 1:
return None, "按总风险%无需再加仓或无法再加"
return None, "已满足风险上限或无法再加"
return lots, None
def roll_eligibility_error(
*,
sizing_mode: str,
monitor: dict,
has_active_trend: bool,
legs_done: int = 0,
has_pending_leg: bool = False,
) -> Optional[str]:
if normalize_sizing_mode(sizing_mode) != MODE_AMOUNT:
return "仅固定金额(以损定仓)模式可滚仓"
if has_active_trend:
return "趋势回调运行中,不可滚仓"
if not monitor or (monitor.get("status") or "").strip().lower() != "active":
return "无有效持仓监控"
if int(monitor.get("trailing_be") or 0):
return "移动保本持仓不可滚仓"
direction = (monitor.get("direction") or "long").strip().lower()
if legs_done >= max_roll_legs(direction):
return f"滚仓已达 {max_roll_legs(direction)} 次上限"
if has_pending_leg:
return "已有监控中的加仓腿,请等待成交或删除后再提交"
if int(monitor.get("lots") or 0) < 1:
return "持仓手数为 0"
if not float(monitor.get("take_profit") or 0):
return "首仓须设置止盈(移动保本不可滚仓)"
return None
def normalize_sizing_mode(raw: str) -> str:
from position_sizing import normalize_sizing_mode as _norm
return _norm(raw)
def resolve_risk_percent(monitor: dict, *, default: float) -> float:
try:
rp = float(monitor.get("risk_percent") or 0)
if rp > 0:
return rp
except (TypeError, ValueError):
pass
return float(default)
def validate_roll_geometry(
direction: str,
add_mode: str,
new_stop: float,
*,
mark_price: float,
limit_price: Optional[float] = None,
breakthrough_price: Optional[float] = None,
at_trigger: bool = False,
) -> Optional[str]:
"""几何校验。
做多斐波(回调):止损 < 触发价 < 当前价
做多突破(向上):止损 < 突破价 < 当前价
做空斐波(反弹):当前价 < 触发价 < 止损
做空突破(向下):突破价 < 当前价 < 止损(提交时);触发后当前价可 ≤ 突破价
"""
direction = (direction or "long").strip().lower()
mode = (add_mode or ADD_MODE_MARKET).strip().lower()
sl = float(new_stop)
mark = float(mark_price)
if sl <= 0 or mark <= 0:
return "止损或参考价无效"
if mode == ADD_MODE_MARKET:
if direction == "long" and sl >= mark:
return "做多:新止损须低于当前价"
if direction == "short" and sl <= mark:
return "做空:新止损须高于当前价"
return None
trigger = None
if mode in FIB_MODES:
trigger = float(limit_price or 0)
if trigger <= 0:
return "须填写斐波触发价"
if direction == "long":
if not (sl < trigger < mark):
return "做多斐波:须满足 止损 < 触发价 < 当前价"
else:
if not (mark < trigger < sl):
return "做空斐波:须满足 当前价 < 触发价 < 止损"
return None
if mode == ADD_MODE_BREAKOUT:
trigger = float(breakthrough_price or 0)
if trigger <= 0:
return "须填写突破价"
if at_trigger:
if direction == "long":
if not (sl < trigger <= mark):
return "做多突破:触发时须满足 止损 < 突破价 ≤ 当前价"
else:
if not (trigger < sl and mark < sl):
return "做空突破:触发时须满足 突破价 < 止损且当前价 < 止损"
return None
if direction == "long":
if not (sl < trigger < mark):
return "做多突破:须满足 止损 < 突破价 < 当前价"
else:
if not (trigger < mark < sl):
return "做空突破:须满足 突破价 < 当前价 < 止损"
return None
return "加仓方式无效"
def detect_mark_cross(
direction: str,
add_mode: str,
prev_mark: float,
mark: float,
trigger_price: float,
) -> bool:
"""标记价穿越触发价(上一 tick 与当前 tick 比较)。"""
direction = (direction or "long").strip().lower()
mode = (add_mode or "").strip().lower()
p = float(trigger_price)
prev_m = float(prev_mark)
cur_m = float(mark)
if p <= 0 or prev_m <= 0 or cur_m <= 0:
return False
if mode in FIB_MODES:
if direction == "long":
return prev_m > p and cur_m <= p
return prev_m < p and cur_m >= p
if mode == ADD_MODE_BREAKOUT:
if direction == "long":
return prev_m < p and cur_m >= p
return prev_m > p and cur_m <= p
return False
def preview_roll(
*,
direction: str,
@@ -102,39 +259,70 @@ def preview_roll(
initial_take_profit: float,
add_mode: str,
new_stop_loss: float,
risk_percent: float,
capital_base: float,
risk_budget: float,
mult: int,
mark_price: Optional[float] = None,
add_price: Optional[float] = None,
limit_price: Optional[float] = None,
breakthrough_price: Optional[float] = None,
fib_upper: Optional[float] = None,
fib_lower: Optional[float] = None,
legs_done: int = 0,
at_trigger: bool = False,
) -> Tuple[Optional[dict[str, Any]], Optional[str]]:
direction = (direction or "long").strip().lower()
if legs_done >= max_roll_legs(direction):
return None, f"滚仓已达 {max_roll_legs(direction)} 次上限"
mode = (add_mode or "market").strip().lower()
if mode == "market":
if not add_price or add_price <= 0:
return None, "需要有效参考价"
entry_add = float(add_price)
mode_label = "市价"
elif mode in FIB_MODES:
if fib_upper is None or fib_lower is None:
return None, "斐波须填上沿/下沿"
entry_add, err = fib_limit_entry(direction, float(fib_upper), float(fib_lower), mode)
if err:
return None, err
mode_label = "斐波0.618" if "618" in mode else "斐波0.786"
else:
return None, "加仓方式无效"
mode = (add_mode or ADD_MODE_MARKET).strip().lower()
mark = float(mark_price or add_price or 0)
if mark <= 0:
return None, "需要有效参考价"
sl = float(new_stop_loss)
tp = float(initial_take_profit)
if sl <= 0 or tp <= 0:
return None, "止损/止盈无效"
risk_budget = float(capital_base) * float(risk_percent) / 100.0
entry_add = mark
mode_label = add_mode_label(mode)
trigger_price = mark
is_pending = mode in PENDING_MODES
if mode == ADD_MODE_MARKET:
entry_add = mark
elif mode in FIB_MODES:
if limit_price and float(limit_price) > 0:
entry_add = float(limit_price)
trigger_price = entry_add
elif fib_upper is not None and fib_lower is not None:
entry_add, err = fib_limit_entry(direction, float(fib_upper), float(fib_lower), mode)
if err:
return None, err
trigger_price = entry_add
else:
return None, "斐波须填触发价或上沿/下沿"
elif mode == ADD_MODE_BREAKOUT:
if not breakthrough_price or float(breakthrough_price) <= 0:
return None, "须填写突破价"
entry_add = float(breakthrough_price)
trigger_price = entry_add
else:
return None, "加仓方式无效"
geom_err = validate_roll_geometry(
direction, mode, sl,
mark_price=mark,
limit_price=trigger_price if mode in FIB_MODES else None,
breakthrough_price=trigger_price if mode == ADD_MODE_BREAKOUT else None,
at_trigger=at_trigger,
)
if geom_err:
return None, geom_err
budget = float(risk_budget)
if budget <= 0:
return None, "固定金额无效"
q2, err = solve_add_lots_for_total_risk(
direction, qty_existing, entry_existing, entry_add, sl, risk_budget, mult
direction, qty_existing, entry_existing, entry_add, sl, budget, mult,
)
if err:
return None, err
@@ -150,15 +338,22 @@ def preview_roll(
return {
"symbol": symbol,
"direction": direction,
"add_mode": mode,
"add_mode_label": mode_label,
"is_pending": is_pending,
"add_price": round(entry_add, 4),
"trigger_price": round(trigger_price, 4),
"limit_price": round(trigger_price, 4) if mode in FIB_MODES else None,
"breakthrough_price": round(trigger_price, 4) if mode == ADD_MODE_BREAKOUT else None,
"new_stop_loss": round(sl, 4),
"initial_take_profit": tp,
"risk_percent": float(risk_percent),
"risk_budget": round(budget, 2),
"fixed_amount": round(budget, 2),
"add_lots": q2,
"qty_after": int(new_qty),
"avg_entry_after": round(new_avg, 4),
"loss_at_sl": round(loss_at_sl, 2),
"reward_at_tp": round(reward_at_tp, 2),
"legs_done": legs_done,
"mark_price": round(mark, 4),
}, None
+151
View File
@@ -0,0 +1,151 @@
# Copyright (c) 2025-2026 马建军. All rights reserved.
# 专有软件 — 未经授权禁止复制、传播、转售。
# 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。
# 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md
"""顺势滚仓程序监控:突破 pending 腿触价成交、外部平仓同步。"""
from __future__ import annotations
import logging
from datetime import datetime
from typing import Any, Callable, Optional
from zoneinfo import ZoneInfo
from contract_specs import get_contract_spec
from strategy.strategy_roll_lib import (
ADD_MODE_BREAKOUT,
FIB_MODES,
LEG_STATUS_CANCELLED,
LEG_STATUS_FILLED,
LEG_STATUS_INVALIDATED,
LEG_STATUS_PENDING,
detect_mark_cross,
preview_roll,
)
logger = logging.getLogger(__name__)
TZ = ZoneInfo("Asia/Shanghai")
def _now() -> str:
return datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S")
def roll_sync_after_external_close(conn, *, monitor_id: int) -> None:
"""手动平仓或监控结案后关闭滚仓组并清除 pending 腿。"""
grp = conn.execute(
"SELECT id FROM roll_groups WHERE order_monitor_id=? AND status='active'",
(int(monitor_id),),
).fetchone()
if not grp:
return
gid = int(grp["id"])
conn.execute(
"UPDATE roll_legs SET status=? WHERE roll_group_id=? AND status=?",
(LEG_STATUS_CANCELLED, gid, LEG_STATUS_PENDING),
)
conn.execute(
"UPDATE roll_groups SET status='closed', updated_at=? WHERE id=?",
(_now(), gid),
)
def cancel_roll_leg(conn, leg_id: int) -> tuple[bool, str]:
row = conn.execute(
"SELECT * FROM roll_legs WHERE id=? AND status=?",
(int(leg_id), LEG_STATUS_PENDING),
).fetchone()
if not row:
return False, "仅可删除监控中的腿"
conn.execute(
"UPDATE roll_legs SET status=? WHERE id=?",
(LEG_STATUS_CANCELLED, int(leg_id)),
)
return True, "已删除"
def check_roll_monitors(
conn,
*,
get_mark_price_fn: Callable[[str], Optional[float]],
fill_roll_leg_fn: Callable[[dict, dict, dict, dict], tuple[bool, str]],
is_trading_session_fn: Callable[[], bool],
get_risk_budget_fn: Callable[[], float],
) -> None:
"""扫描 pending 滚仓腿,标记价穿越则重算手数并市价成交。"""
if not is_trading_session_fn():
return
rows = conn.execute(
"""SELECT l.*, g.order_monitor_id, g.symbol, g.direction, g.initial_take_profit,
g.risk_percent, g.leg_count AS group_leg_count,
m.lots AS mon_lots, m.entry_price AS mon_entry, m.take_profit AS mon_tp,
m.status AS mon_status
FROM roll_legs l
JOIN roll_groups g ON g.id = l.roll_group_id
JOIN trade_order_monitors m ON m.id = g.order_monitor_id
WHERE l.status=? AND g.status='active' AND m.status='active'""",
(LEG_STATUS_PENDING,),
).fetchall()
for raw in rows:
leg = dict(raw)
if (leg.get("mon_status") or "").strip().lower() != "active":
_invalidate_leg(conn, leg, "监控已结束")
continue
sym = (leg.get("symbol") or "").strip()
mark = get_mark_price_fn(sym)
if not mark or mark <= 0:
continue
prev_mark = float(leg.get("last_mark_price") or mark)
mode = (leg.get("add_mode") or "").strip().lower()
trigger = float(leg.get("limit_price") or leg.get("breakthrough_price") or 0)
direction = (leg.get("direction") or "long").strip().lower()
if mode in FIB_MODES or mode == ADD_MODE_BREAKOUT:
if not detect_mark_cross(direction, mode, prev_mark, mark, trigger):
conn.execute(
"UPDATE roll_legs SET last_mark_price=? WHERE id=?",
(float(mark), int(leg["id"])),
)
continue
mon = {
"id": leg["order_monitor_id"],
"symbol": sym,
"direction": direction,
"lots": leg["mon_lots"],
"entry_price": leg["mon_entry"],
"take_profit": leg["mon_tp"] or leg["initial_take_profit"],
}
grp = {
"id": leg["roll_group_id"],
"order_monitor_id": leg["order_monitor_id"],
"leg_count": leg.get("group_leg_count") or 0,
"risk_percent": leg.get("risk_percent"),
}
preview, err = preview_roll(
direction=direction,
symbol=sym,
qty_existing=float(leg["mon_lots"] or 0),
entry_existing=float(leg["mon_entry"] or 0),
initial_take_profit=float(leg["mon_tp"] or leg["initial_take_profit"] or 0),
add_mode=mode,
new_stop_loss=float(leg["new_stop_loss"] or 0),
risk_budget=float(leg.get("risk_percent") or 0) or get_risk_budget_fn(),
mult=int(get_contract_spec(sym).get("mult") or 1),
mark_price=mark,
limit_price=trigger if mode in FIB_MODES else None,
breakthrough_price=trigger if mode == ADD_MODE_BREAKOUT else None,
legs_done=int(leg.get("group_leg_count") or 0),
at_trigger=True,
)
if err or not preview:
_invalidate_leg(conn, leg, err or "触发时无法加仓")
continue
ok, msg = fill_roll_leg_fn(mon, grp, leg, preview)
if not ok:
logger.warning("roll leg fill failed #%s: %s", leg.get("id"), msg)
def _invalidate_leg(conn, leg: dict, reason: str) -> None:
conn.execute(
"UPDATE roll_legs SET status=?, invalidated_reason=? WHERE id=?",
(LEG_STATUS_INVALIDATED, (reason or "")[:200], int(leg["id"])),
)