diff --git a/app.py b/app.py index 74cc604..a5caf38 100644 --- a/app.py +++ b/app.py @@ -369,6 +369,8 @@ def init_db(): set_setting("position_sizing_mode", "risk") if not get_setting("risk_percent"): set_setting("risk_percent", "1") + if not get_setting("max_margin_pct"): + set_setting("max_margin_pct", "30") if not get_setting("fee_source_mode"): set_setting("fee_source_mode", "ctp") set_setting("fee_source_mode", "ctp") @@ -1620,6 +1622,12 @@ def settings(): except ValueError: flash("风险比例无效") return redirect(url_for("settings")) + try: + mp = float(request.form.get("max_margin_pct", "30") or 30) + set_setting("max_margin_pct", str(max(1.0, min(100.0, mp)))) + except ValueError: + flash("保证金比例无效") + return redirect(url_for("settings")) flash("交易模式已保存") elif action == "nav": items = {k: request.form.get(f"nav_{k}") == "on" for k in NAV_TOGGLES} @@ -1659,6 +1667,7 @@ def settings(): trading_mode=get_setting("trading_mode", "simulation"), position_sizing_mode=get_setting("position_sizing_mode", "risk"), risk_percent=get_setting("risk_percent", "1"), + max_margin_pct=get_setting("max_margin_pct", "30"), nav_items=get_nav_items(get_setting), nav_toggles=NAV_TOGGLES, ) diff --git a/ctp_premarket_connect.py b/ctp_premarket_connect.py new file mode 100644 index 0000000..2af9dd4 --- /dev/null +++ b/ctp_premarket_connect.py @@ -0,0 +1,62 @@ +"""交易前自动连接 CTP(默认开盘前 30 分钟)。""" +from __future__ import annotations + +import logging +import os +import threading +import time +from typing import Callable + +from market_sessions import in_premarket_connect_window +from vnpy_bridge import ctp_start_connect, ctp_status + +logger = logging.getLogger(__name__) + +CHECK_INTERVAL_SEC = 60 +DEFAULT_MINUTES_BEFORE = 30 + + +def _premarket_enabled() -> bool: + return (os.getenv("CTP_PREMARKET_CONNECT", "true") or "true").strip().lower() in ( + "1", + "true", + "yes", + ) + + +def _minutes_before_open() -> int: + try: + return max(5, int(os.getenv("CTP_PREMARKET_MINUTES", str(DEFAULT_MINUTES_BEFORE)))) + except (TypeError, ValueError): + return DEFAULT_MINUTES_BEFORE + + +def start_ctp_premarket_connect_worker( + *, + get_mode_fn: Callable[[], str], + interval: int = CHECK_INTERVAL_SEC, +) -> None: + """在交易开始前若干分钟自动发起 CTP 连接。""" + + def _loop() -> None: + time.sleep(10) + while True: + try: + if _premarket_enabled() and in_premarket_connect_window( + minutes_before=_minutes_before_open(), + ): + mode = get_mode_fn() + st = ctp_status(mode) + if not st.get("connected") and not st.get("connecting"): + info = ctp_start_connect(mode, force=False) + if info.get("started"): + logger.info( + "盘前自动连接 CTP [%s](开盘前 %d 分钟)", + mode, + _minutes_before_open(), + ) + except Exception as exc: + logger.warning("CTP premarket connect worker: %s", exc) + time.sleep(max(30, interval)) + + threading.Thread(target=_loop, daemon=True, name="ctp-premarket-connect").start() diff --git a/install_trading.py b/install_trading.py index 6469ff3..a6117f4 100644 --- a/install_trading.py +++ b/install_trading.py @@ -16,20 +16,22 @@ from position_sizing import ( MODE_RISK, DEFAULT_MAX_ORDER_LOTS, calc_lots_by_risk, + calc_margin_usage_pct, calc_order_tick_metrics, normalize_sizing_mode, ) from recommend_store import load_recommend_cache, recommend_payload, refresh_recommend_cache from recommend_stream import recommend_hub, start_recommend_worker from ctp_reconnect import start_ctp_reconnect_worker +from ctp_premarket_connect import start_ctp_premarket_connect_worker from ctp_fee_worker import start_ctp_fee_worker from sl_tp_guard import ( + cancel_monitor_exit_orders, ensure_monitor_order_columns, monitor_order_status, place_monitor_exit_orders, reconcile_monitors_without_position, start_sl_tp_guard_worker, - sync_all_sl_tp_orders, ) from risk.account_risk_lib import ( assert_can_open, @@ -50,6 +52,7 @@ from trading_context import ( TRADING_MODE_LIVE, TRADING_MODE_SIM, get_account_capital, + get_max_margin_pct, get_risk_percent, get_sizing_mode, get_trading_mode, @@ -79,6 +82,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se "trading_mode": get_trading_mode(get_setting), "position_sizing_mode": get_sizing_mode(get_setting), "risk_percent": str(get_risk_percent(get_setting)), + "max_margin_pct": str(get_max_margin_pct(get_setting)), } def _capital(conn) -> float: @@ -194,14 +198,14 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se pending.append({ **base, "order_kind": "stop_loss", - "label": "止损挂单", + "label": "止损监控", "price": float(sl), }) if tp is not None: pending.append({ **base, "order_kind": "take_profit", - "label": "止盈挂单", + "label": "止盈监控", "price": float(tp), }) ctp_st = ctp_status(mode) @@ -311,16 +315,11 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se order_st = monitor_order_status( mon or {}, mode=mode, ths_code=sym, direction=direction, ) - can_place = bool( - mon - and (mon.get("stop_loss") is not None or mon.get("take_profit") is not None) - and (order_st.get("needs_sl_order") or order_st.get("needs_tp_order")) - ) pending_for_row: list[dict] = [] if sl is not None: pending_for_row.append({ "order_kind": "stop_loss", - "label": "止损挂单", + "label": "止损监控", "price": sl, "lots": lots, "source": "monitor", @@ -329,7 +328,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se if tp is not None: pending_for_row.append({ "order_kind": "take_profit", - "label": "止盈挂单", + "label": "止盈监控", "price": tp, "lots": lots, "source": "monitor", @@ -360,9 +359,11 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se "est_fee_close": fee_info["close_fee"], "est_fee_close_type": fee_info["close_type"], "est_pnl_net": est_net, - "sl_order_active": order_st.get("sl_order_active"), - "tp_order_active": order_st.get("tp_order_active"), - "can_place_orders": can_place, + "sl_order_active": order_st.get("sl_monitoring"), + "tp_order_active": order_st.get("tp_monitoring"), + "sl_monitoring": order_st.get("sl_monitoring"), + "tp_monitoring": order_st.get("tp_monitoring"), + "can_place_orders": False, "tick_value_total": tick.get("tick_value_total"), "price_precision": tick.get("price_precision"), "tick_size": tick.get("tick_size"), @@ -414,6 +415,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se sizing_mode=sizing, sizing_mode_label="以损定仓" if sizing == MODE_RISK else "固定张数", risk_percent=get_risk_percent(get_setting), + max_margin_pct=get_max_margin_pct(get_setting), recommend_rows=rec_cache.get("rows") or [], recommend_updated_at=rec_cache.get("updated_at"), ) @@ -531,20 +533,14 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se mon_row = conn.execute( "SELECT * FROM trade_order_monitors WHERE id=?", (mid,), ).fetchone() - if mon_row and (sl is not None or tp is not None): - try: - ensure_monitor_order_columns(conn) - place_monitor_exit_orders(conn, dict(mon_row), mode=mode, force=False) - except Exception as exc: - logger.warning("补充止盈止损后自动委托失败: %s", exc) - return jsonify({"ok": True, "monitor_id": mid, "message": "止盈止损已保存"}) + return jsonify({"ok": True, "monitor_id": mid, "message": "止盈止损已保存,程序本地监控"}) finally: conn.close() @app.route("/api/trading/monitor/place-orders", methods=["POST"]) @login_required def api_trading_monitor_place_orders(): - """按开仓快照向 CTP 挂止盈止损平仓委托。""" + """本地监控模式:清理旧版柜台挂单,不再向交易所挂止盈止损。""" d = request.get_json(silent=True) or {} try: monitor_id = int(d.get("monitor_id") or 0) @@ -757,8 +753,12 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se capital = _capital(conn) conn.close() sizing = get_sizing_mode(get_setting) + margin_pct = get_max_margin_pct(get_setting) if sizing == MODE_RISK: - lots, err = calc_lots_by_risk(entry, sl, direction, capital, get_risk_percent(get_setting), sym) + lots, err = calc_lots_by_risk( + entry, sl, direction, capital, get_risk_percent(get_setting), sym, + max_margin_pct=margin_pct, + ) if err: return jsonify({"ok": False, "error": err}), 400 else: @@ -815,11 +815,26 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se return jsonify({"ok": False, "error": "以损定仓模式须填写止损价"}), 400 lots_calc, err = calc_lots_by_risk( price, sl, direction, _capital(conn), get_risk_percent(get_setting), sym, + max_margin_pct=get_max_margin_pct(get_setting), ) if err: conn.close() return jsonify({"ok": False, "error": err}), 400 lots = lots_calc or lots + margin_pct = get_max_margin_pct(get_setting) + usage = calc_margin_usage_pct( + _ctp_positions(mode), + _capital(conn), + extra_symbol=sym if offset.startswith("open") else "", + extra_lots=lots if offset.startswith("open") else 0, + extra_price=price if offset.startswith("open") else 0, + ) + if offset.startswith("open") and usage > margin_pct: + conn.close() + return jsonify({ + "ok": False, + "error": f"保证金占用 {usage:.1f}% 超过上限 {margin_pct:g}%(可在系统设置修改)", + }), 403 if lots > DEFAULT_MAX_ORDER_LOTS: conn.close() return jsonify({ @@ -881,9 +896,9 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se if mon_row and (sl or tp): try: ensure_monitor_order_columns(conn) - place_monitor_exit_orders(conn, dict(mon_row), mode=mode, force=False) + cancel_monitor_exit_orders(conn, dict(mon_row), mode=mode) except Exception as exc: - logger.warning("开仓后自动挂止盈止损失败: %s", exc) + logger.warning("清理旧版止盈止损挂单失败: %s", exc) conn.commit() send_wechat_msg(f"{trading_mode_label(get_setting)} {offset} {sym} {direction} {lots}手 @{price}") conn.close() @@ -1011,7 +1026,10 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se init_strategy_tables(conn) capital = _capital(conn) mode = get_trading_mode(get_setting) - rows = refresh_recommend_cache(conn, capital, _main_quote, trading_mode=mode) + rows = refresh_recommend_cache( + conn, capital, _main_quote, trading_mode=mode, + max_margin_pct=get_max_margin_pct(get_setting), + ) payload = recommend_payload(conn, live_capital=capital) recommend_hub.broadcast("recommend", {"ok": True, **payload}) return jsonify({"ok": True, "count": len(rows), **payload}) @@ -1345,8 +1363,10 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se quote_fn=_main_quote, init_tables_fn=_init_tables, get_mode_fn=lambda: get_trading_mode(get_setting), + get_max_margin_pct_fn=lambda: get_max_margin_pct(get_setting), ) start_ctp_reconnect_worker(get_mode_fn=lambda: get_trading_mode(get_setting)) + start_ctp_premarket_connect_worker(get_mode_fn=lambda: get_trading_mode(get_setting)) start_sl_tp_guard_worker( db_path=DB_PATH, get_mode_fn=lambda: get_trading_mode(get_setting), diff --git a/market_sessions.py b/market_sessions.py new file mode 100644 index 0000000..74cf962 --- /dev/null +++ b/market_sessions.py @@ -0,0 +1,102 @@ +"""国内期货交易时段与盘前连接窗口。""" +from __future__ import annotations + +from datetime import datetime, timedelta +from typing import Optional +from zoneinfo import ZoneInfo + +TZ = ZoneInfo("Asia/Shanghai") + +# 各交易段开盘时刻 (时, 分) +SESSION_OPENS = ( + (9, 0), + (13, 30), + (21, 0), +) + + +def is_trading_session(now: Optional[datetime] = None) -> bool: + d = now or datetime.now(TZ) + if d.tzinfo is None: + d = d.replace(tzinfo=TZ) + else: + d = d.astimezone(TZ) + wd = d.weekday() + if wd == 6: + return False + if wd == 5 and d.hour < 21: + return False + t = d.hour * 60 + d.minute() + def in_range(sh: int, sm: int, eh: int, em: int) -> bool: + return t >= sh * 60 + sm and t < eh * 60 + em + if in_range(9, 0, 11, 30): + return True + if in_range(13, 30, 15, 0): + return True + if in_range(21, 0, 24, 0): + return True + if in_range(0, 0, 2, 30): + return True + return False + + +def _session_open_allowed(day: datetime, hour: int, minute: int) -> bool: + wd = day.weekday() + if (hour, minute) == (9, 0) or (hour, minute) == (13, 30): + return wd < 5 + if (hour, minute) == (21, 0): + if wd < 5: + return True + return wd == 5 + return False + + +def iter_session_starts( + start: datetime, + *, + hours_ahead: int = 36, +) -> list[datetime]: + """列出 start 之后若干小时内的各段开盘时刻。""" + if start.tzinfo is None: + start = start.replace(tzinfo=TZ) + else: + start = start.astimezone(TZ) + end = start + timedelta(hours=hours_ahead) + out: list[datetime] = [] + day = start.replace(hour=0, minute=0, second=0, microsecond=0) + while day <= end: + for h, m in SESSION_OPENS: + if not _session_open_allowed(day, h, m): + continue + dt = day.replace(hour=h, minute=m) + if dt > start and dt <= end: + out.append(dt) + day += timedelta(days=1) + out.sort() + return out + + +def minutes_until_next_session(now: Optional[datetime] = None) -> Optional[float]: + d = now or datetime.now(TZ) + if d.tzinfo is None: + d = d.replace(tzinfo=TZ) + else: + d = d.astimezone(TZ) + starts = iter_session_starts(d, hours_ahead=48) + if not starts: + return None + return (starts[0] - d).total_seconds() / 60.0 + + +def in_premarket_connect_window( + now: Optional[datetime] = None, + *, + minutes_before: int = 30, +) -> bool: + """距下一段开盘 <= minutes_before 分钟,且当前尚未进入交易时段。""" + if is_trading_session(now): + return False + mins = minutes_until_next_session(now) + if mins is None: + return False + return 0 < mins <= float(minutes_before) diff --git a/position_sizing.py b/position_sizing.py index ab12e00..d491ab8 100644 --- a/position_sizing.py +++ b/position_sizing.py @@ -36,6 +36,7 @@ def calc_lots_by_risk( ths_code: str, *, max_lots: Optional[int] = None, + max_margin_pct: float = 30.0, ) -> tuple[Optional[int], Optional[str]]: """以损定仓:返回 (手数, 错误信息)。""" try: @@ -62,9 +63,10 @@ def calc_lots_by_risk( return None, f"按 {rp}% 风险预算,当前止损距离下不足 1 手" margin_rate = spec["margin_rate"] margin_per_lot = entry_f * mult * margin_rate - max_by_margin = int(math.floor(cap * 0.85 / margin_per_lot)) if margin_per_lot > 0 else lots + margin_cap = max(1.0, min(100.0, float(max_margin_pct or 30.0))) + max_by_margin = int(math.floor(cap * margin_cap / 100.0 / margin_per_lot)) if margin_per_lot > 0 else lots if max_by_margin < 1: - return None, "可用资金不足以覆盖 1 手保证金" + return None, f"按保证金上限 {margin_cap:g}%,当前不足 1 手" lots = min(lots, max_by_margin) cap_lots = max_lots if max_lots is not None else DEFAULT_MAX_ORDER_LOTS lots = min(lots, cap_lots) @@ -95,3 +97,32 @@ def calc_order_tick_metrics(ths_code: str, lots: float, price: Optional[float] = "margin_total": margin_total, "margin_rate": margin_rate, } + + +def calc_margin_usage_pct( + positions: list[dict], + capital: float, + *, + extra_symbol: str = "", + extra_lots: int = 0, + extra_price: float = 0, +) -> float: + """当前持仓 + 拟开仓占权益的保证金比例(%)。""" + cap = float(capital or 0) + if cap <= 0: + return 999.0 + total = 0.0 + for p in positions: + lots = int(p.get("lots") or 0) + if lots <= 0: + continue + sym = (p.get("symbol") or "").strip() + entry = float(p.get("avg_price") or p.get("entry_price") or 0) + if entry <= 0: + continue + spec = get_contract_spec(sym) + total += entry * spec["mult"] * lots * spec["margin_rate"] + if extra_symbol and extra_lots > 0 and extra_price > 0: + spec = get_contract_spec(extra_symbol) + total += extra_price * spec["mult"] * extra_lots * spec["margin_rate"] + return round(total / cap * 100.0, 2) diff --git a/product_recommend.py b/product_recommend.py index 1f6377a..63c1f0d 100644 --- a/product_recommend.py +++ b/product_recommend.py @@ -1,6 +1,7 @@ """按账户资金推荐可交易品种(期货核心筛选)。""" from __future__ import annotations +import math from concurrent.futures import ThreadPoolExecutor from typing import Callable, Optional @@ -20,7 +21,7 @@ def assess_product_for_capital( capital: float, price: Optional[float], *, - max_position_pct: float = 50.0, + max_margin_pct: float = 30.0, default_stop_ticks: int = 20, reward_risk_ratio: float = 2.0, trading_mode: str = "simulation", @@ -35,6 +36,7 @@ def assess_product_for_capital( tick = float(spec.get("tick_size") or 1.0) p = float(price) if price and price > 0 else 0.0 cap = float(capital or 0) + margin_pct = max(1.0, min(100.0, float(max_margin_pct or 30.0))) if p <= 0: return { @@ -45,11 +47,14 @@ def assess_product_for_capital( "status_label": "暂无行情", "min_capital_one_lot": None, "margin_one_lot": None, + "recommended_lots": 0, "risk_one_lot_1pct": None, } margin_one = p * mult * margin_rate - min_capital = margin_one / (max_position_pct / 100.0) if max_position_pct > 0 else margin_one + min_capital = margin_one / (margin_pct / 100.0) if margin_pct > 0 else margin_one + margin_budget = cap * margin_pct / 100.0 if cap > 0 else 0.0 + recommended_lots = int(math.floor(margin_budget / margin_one)) if margin_one > 0 and margin_budget > 0 else 0 stop_dist = tick * default_stop_ticks risk_one_lot = stop_dist * mult risk_pct_1lot = (risk_one_lot / cap * 100) if cap > 0 else 999.0 @@ -60,13 +65,13 @@ def assess_product_for_capital( fee_ths, p, p, 1.0, open_time="", close_time="", trading_mode=trading_mode, ) - can_margin = cap >= min_capital + can_margin = recommended_lots >= 1 can_risk = cap > 0 and risk_one_lot <= cap * 0.01 if can_margin and can_risk: - status, label = "ok", "推荐" + status, label = "ok", f"推荐 {recommended_lots} 手" elif can_margin: - status, label = "margin_ok", "可开1手·止损偏宽" + status, label = "margin_ok", f"可开 {recommended_lots} 手·止损偏宽" else: status, label = "blocked", "资金不足" @@ -79,6 +84,9 @@ def assess_product_for_capital( "tick_size": tick, "margin_one_lot": round(margin_one, 2), "min_capital_one_lot": round(min_capital, 2), + "recommended_lots": recommended_lots, + "margin_budget": round(margin_budget, 2), + "max_margin_pct": margin_pct, "risk_one_lot_1pct": round(risk_one_lot, 2), "risk_pct_1lot_at_1pct_rule": round(risk_pct_1lot, 2), "ref_stop_loss": ref_sl, @@ -94,7 +102,7 @@ def list_product_recommendations( capital: float, quote_fn: Callable[[str], Optional[dict]], *, - max_position_pct: float = 50.0, + max_margin_pct: float = 30.0, trading_mode: str = "simulation", ) -> list[dict]: """扫描全部品种并排序:推荐 > 可开 > 不足。quote_fn(品种代码) -> {price, ths_code, ...}""" @@ -105,7 +113,7 @@ def list_product_recommendations( price = quote.get("price") row = assess_product_for_capital( product, capital, price, - max_position_pct=max_position_pct, + max_margin_pct=max_margin_pct, trading_mode=trading_mode, ) main_code = (quote.get("ths_code") or "").strip() @@ -115,5 +123,5 @@ def list_product_recommendations( with ThreadPoolExecutor(max_workers=10) as pool: rows = list(pool.map(_one, PRODUCTS)) order = {"ok": 0, "margin_ok": 1, "blocked": 2, "no_price": 3} - rows.sort(key=lambda r: (order.get(r["status"], 9), r.get("min_capital_one_lot") or 1e18)) + rows.sort(key=lambda r: (order.get(r["status"], 9), -(r.get("recommended_lots") or 0))) return rows diff --git a/recommend_store.py b/recommend_store.py index fafdf06..a841cd4 100644 --- a/recommend_store.py +++ b/recommend_store.py @@ -32,10 +32,13 @@ def refresh_recommend_cache( quote_fn: Callable[[str], Optional[dict]], *, trading_mode: str = "simulation", + max_margin_pct: float = 30.0, ) -> list[dict]: """后台拉行情、筛选并写入数据库。""" ensure_recommend_tables(conn) - all_rows = list_product_recommendations(capital, quote_fn, trading_mode=trading_mode) + all_rows = list_product_recommendations( + capital, quote_fn, max_margin_pct=max_margin_pct, trading_mode=trading_mode, + ) rows = filter_affordable_recommendations(all_rows) now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") conn.execute( diff --git a/recommend_stream.py b/recommend_stream.py index 7201c60..d842ebb 100644 --- a/recommend_stream.py +++ b/recommend_stream.py @@ -56,6 +56,7 @@ def start_recommend_worker( quote_fn: Callable[[str], Optional[dict]], init_tables_fn: Callable | None = None, get_mode_fn: Callable[[], str] | None = None, + get_max_margin_pct_fn: Callable[[], float] | None = None, interval: int = CHECK_INTERVAL_SEC, ) -> None: """后台每日刷新推荐(每小时检查一次是否需更新),并推送给 SSE 订阅者。""" @@ -69,9 +70,12 @@ def start_recommend_worker( init_tables_fn(conn) capital = float(get_capital_fn(conn) or 0) mode = get_mode_fn() if get_mode_fn else "simulation" + max_pct = float(get_max_margin_pct_fn()) if get_max_margin_pct_fn else 30.0 cached = load_recommend_cache(conn) if recommend_cache_stale(cached.get("updated_at")): - refresh_recommend_cache(conn, capital, quote_fn, trading_mode=mode) + refresh_recommend_cache( + conn, capital, quote_fn, trading_mode=mode, max_margin_pct=max_pct, + ) cached = load_recommend_cache(conn) logger.info("品种推荐每日刷新完成,capital=%.2f rows=%d", capital, len(cached.get("rows") or [])) payload = {**cached, "capital": capital} diff --git a/sl_tp_guard.py b/sl_tp_guard.py index 12362da..bae5a9e 100644 --- a/sl_tp_guard.py +++ b/sl_tp_guard.py @@ -1,4 +1,4 @@ -"""止盈止损守护:检测持仓快照,自动/手动向 CTP 挂平仓限价委托。""" +"""止盈止损守护:程序本地监控价位,触发后向 CTP 发平仓单(不向交易所挂 SL/TP 限价单)。""" from __future__ import annotations import logging @@ -19,10 +19,10 @@ from vnpy_bridge import ( logger = logging.getLogger(__name__) -CHECK_INTERVAL_SEC = 20 -PLACE_COOLDOWN_SEC = 120 +CHECK_INTERVAL_SEC = 5 +PLACE_COOLDOWN_SEC = 30 -_last_place_attempt: dict[tuple[int, str], float] = {} +_last_close_attempt: dict[int, float] = {} MONITOR_ORDER_COLUMNS = ( "ALTER TABLE trade_order_monitors ADD COLUMN sl_vt_order_id TEXT", @@ -62,26 +62,6 @@ def _price_near(a: float, b: float, tick: float) -> bool: return abs(float(a) - float(b)) <= max(tick * 0.501, 1e-9) -def _is_resting_exit_price( - hold_direction: str, - kind: str, - exit_price: float, - mark: Optional[float], - tick: float, -) -> bool: - """限价平仓单是否会挂在盘口(而非立即成交)。""" - if mark is None or mark <= 0: - return True - buf = max(tick * 0.5, 1e-9) - if hold_direction == "long": - if kind == "sl": - return exit_price < mark - buf - return exit_price > mark + buf - if kind == "sl": - return exit_price > mark + buf - return exit_price < mark - buf - - def _find_close_order( active_orders: list[dict], *, @@ -117,23 +97,27 @@ def _find_position(positions: list[dict], ths_code: str, direction: str) -> Opti return None -def _can_place_now(monitor_id: int, kind: str, *, cooldown: int = PLACE_COOLDOWN_SEC) -> bool: - last = _last_place_attempt.get((monitor_id, kind), 0.0) +def _can_close_now(monitor_id: int, *, cooldown: int = PLACE_COOLDOWN_SEC) -> bool: + last = _last_close_attempt.get(monitor_id, 0.0) return (time.time() - last) >= cooldown -def _mark_place_attempt(monitor_id: int, kind: str) -> None: - _last_place_attempt[(monitor_id, kind)] = time.time() +def _mark_close_attempt(monitor_id: int) -> None: + _last_close_attempt[monitor_id] = time.time() -def _order_still_active(active_orders: list[dict], vt_order_id: str) -> bool: - if not vt_order_id: - return False - oid = str(vt_order_id).strip() - for o in active_orders: - if str(o.get("order_id") or "") == oid: - return True - return False +def _sl_triggered(direction: str, sl: float, mark: float, tick: float) -> bool: + buf = max(tick * 0.01, 1e-9) + if direction == "long": + return mark <= sl + buf + return mark >= sl - buf + + +def _tp_triggered(direction: str, tp: float, mark: float, tick: float) -> bool: + buf = max(tick * 0.01, 1e-9) + if direction == "long": + return mark >= tp - buf + return mark <= tp + buf def cancel_monitor_exit_orders( @@ -142,7 +126,7 @@ def cancel_monitor_exit_orders( *, mode: str, ) -> int: - """撤销该监控对应的止盈止损平仓挂单。""" + """撤销该监控在交易所残留的旧版止盈止损平仓挂单。""" ensure_monitor_order_columns(conn) if not ctp_status(mode).get("connected"): return 0 @@ -225,6 +209,99 @@ def reconcile_monitors_without_position(conn, mode: str) -> int: return closed +def _execute_local_close( + conn, + mon: dict, + *, + mode: str, + mark: float, + reason: str, +) -> None: + sym = (mon.get("symbol") or "").strip() + direction = (mon.get("direction") or "long").strip().lower() + positions = ctp_list_positions(mode) + pos = _find_position(positions, sym, direction) + if not pos: + reconcile_monitors_without_position(conn, mode) + return + lots = int(pos.get("lots") or mon.get("lots") or 1) + offset = "close_long" if direction == "long" else "close_short" + cancel_monitor_exit_orders(conn, mon, mode=mode) + execute_order( + conn, + mode=mode, + offset=offset, + symbol=sym, + direction=direction, + lots=lots, + price=mark, + order_type="market", + ) + conn.execute("UPDATE trade_order_monitors SET status='closed' WHERE id=?", (mon["id"],)) + conn.commit() + logger.info( + "止盈止损本地触发 monitor=%s reason=%s %s %s %d手 @%s", + mon.get("id"), reason, sym, direction, lots, mark, + ) + + +def check_monitors_locally(conn, mode: str) -> int: + """扫描 active 监控,本地比对行情;触发止盈/止损(含跳空穿透)后立刻平仓。""" + ensure_monitor_order_columns(conn) + if not ctp_status(mode).get("connected"): + return 0 + reconcile_monitors_without_position(conn, mode) + closed = 0 + rows = conn.execute( + "SELECT * FROM trade_order_monitors WHERE status='active'" + ).fetchall() + for r in rows: + mon = dict(r) + mid = int(mon.get("id") or 0) + sym = (mon.get("symbol") or "").strip() + direction = (mon.get("direction") or "long").strip().lower() + + if mon.get("sl_vt_order_id") or mon.get("tp_vt_order_id"): + cancel_monitor_exit_orders(conn, mon, mode=mode) + + sl = mon.get("stop_loss") + tp = mon.get("take_profit") + try: + sl_f = float(sl) if sl is not None else None + tp_f = float(tp) if tp is not None else None + except (TypeError, ValueError): + sl_f, tp_f = None, None + if sl_f is None and tp_f is None: + continue + + positions = ctp_list_positions(mode) + if not _find_position(positions, sym, direction): + continue + + mark = ctp_get_tick_price(mode, sym) + if mark is None or mark <= 0: + continue + + tick = _tick_size(sym) + reason = None + if tp_f is not None and _tp_triggered(direction, tp_f, mark, tick): + reason = "take_profit" + elif sl_f is not None and _sl_triggered(direction, sl_f, mark, tick): + reason = "stop_loss" + + if not reason: + continue + if mid > 0 and not _can_close_now(mid): + continue + try: + _mark_close_attempt(mid) + _execute_local_close(conn, mon, mode=mode, mark=mark, reason=reason) + closed += 1 + except Exception as exc: + logger.warning("SL/TP local close failed monitor=%s: %s", mid, exc) + return closed + + def place_monitor_exit_orders( conn, mon: dict, @@ -232,117 +309,16 @@ def place_monitor_exit_orders( mode: str, force: bool = False, ) -> dict[str, Any]: - """按开仓快照中的止损/止盈价,向 CTP 挂平仓限价单(缺则补)。""" + """兼容旧 API:本地监控模式不再向交易所挂 SL/TP 单,仅清理旧挂单。""" + del force ensure_monitor_order_columns(conn) if not ctp_status(mode).get("connected"): return {"ok": False, "error": "CTP 未连接", "placed": []} - - sym = (mon.get("symbol") or "").strip() - direction = (mon.get("direction") or "long").strip().lower() - sl = mon.get("stop_loss") - tp = mon.get("take_profit") - try: - sl_f = float(sl) if sl is not None else None - tp_f = float(tp) if tp is not None else None - except (TypeError, ValueError): - sl_f, tp_f = None, None - - if sl_f is None and tp_f is None: - return {"ok": False, "error": "快照无止盈止损,无法委托", "placed": []} - - positions = ctp_list_positions(mode) - pos = _find_position(positions, sym, direction) - if not pos: - reconcile_monitors_without_position(conn, mode) - return {"ok": False, "error": "柜台无对应持仓(可能已被止盈/止损平掉)", "placed": []} - - lots = int(pos.get("lots") or 1) - if lots != int(mon.get("lots") or 0): - conn.execute("UPDATE trade_order_monitors SET lots=? WHERE id=?", (lots, mon["id"])) - conn.commit() - - mark = ctp_get_tick_price(mode, sym) - active = ctp_list_active_orders(mode) - tick = _tick_size(sym) - offset = "close_long" if direction == "long" else "close_short" - placed: list[str] = [] - skipped: list[str] = [] - updates: dict[str, Optional[str]] = {} - - mid = int(mon.get("id") or 0) - - def _maybe_place(kind: str, price: Optional[float], stored_id: str) -> None: - if price is None or price <= 0: - return - existing = _find_close_order( - active, ths_code=sym, hold_direction=direction, price=price, tick=tick, - ) - if existing: - updates[f"{kind}_vt_order_id"] = str(existing.get("order_id") or stored_id or "") - return - if stored_id and _order_still_active(active, stored_id) and not force: - return - if mid > 0 and not force and not _can_place_now(mid, kind): - return - if not _is_resting_exit_price(direction, kind, price, mark, tick): - hint = f"{'止损' if kind == 'sl' else '止盈'} {price}" - if mark: - hint += f"(现价 {mark} 会立即成交)" - skipped.append(hint) - if not force: - logger.info("SL/TP skip immediate fill monitor=%s %s mark=%s", mid, kind, mark) - return - try: - _mark_place_attempt(mid, kind) - result = execute_order( - conn, - mode=mode, - offset=offset, - symbol=sym, - direction=direction, - lots=lots, - price=price, - order_type="limit", - ) - except Exception as exc: - logger.warning("SL/TP place %s monitor=%s failed: %s", kind, mid, exc) - return - oid = str(result.get("order_id") or "") - if oid: - updates[f"{kind}_vt_order_id"] = oid - placed.append(f"{kind}@{price}") - time.sleep(0.3) - positions_after = ctp_list_positions(mode) - if not _find_position(positions_after, sym, direction): - cancel_monitor_exit_orders(conn, mon, mode=mode) - reconcile_monitors_without_position(conn, mode) - return - - sl_id = str(mon.get("sl_vt_order_id") or "") - tp_id = str(mon.get("tp_vt_order_id") or "") - _maybe_place("sl", sl_f, sl_id) - if _find_position(ctp_list_positions(mode), sym, direction): - _maybe_place("tp", tp_f, tp_id) - - if updates: - sl_new = updates.get("sl_vt_order_id", mon.get("sl_vt_order_id")) - tp_new = updates.get("tp_vt_order_id", mon.get("tp_vt_order_id")) - conn.execute( - "UPDATE trade_order_monitors SET sl_vt_order_id=?, tp_vt_order_id=? WHERE id=?", - (sl_new, tp_new, mon["id"]), - ) - conn.commit() - - if not placed and not updates and not skipped: - return {"ok": True, "message": "无需新委托", "placed": []} - msg_parts = [] - if placed: - msg_parts.append("已提交: " + ", ".join(placed)) - elif updates: - msg_parts.append("委托已在柜台") - if skipped: - msg_parts.append("未挂单: " + "; ".join(skipped)) - return {"ok": True, "message": ";".join(msg_parts), "placed": placed, "skipped": skipped} + cancelled = cancel_monitor_exit_orders(conn, mon, mode=mode) + msg = "程序本地监控中,不向交易所挂止盈止损单" + if cancelled: + msg += f";已撤销旧版柜台挂单 {cancelled} 笔" + return {"ok": True, "message": msg, "placed": [], "local_monitor": True} def monitor_order_status( @@ -352,7 +328,8 @@ def monitor_order_status( ths_code: str, direction: str, ) -> dict[str, bool]: - """检查快照价位是否已有对应平仓挂单。""" + """返回本地监控状态(非交易所挂单状态)。""" + del mode, ths_code, direction sl = mon.get("stop_loss") if mon else None tp = mon.get("take_profit") if mon else None try: @@ -360,61 +337,20 @@ def monitor_order_status( tp_f = float(tp) if tp is not None else None except (TypeError, ValueError): sl_f, tp_f = None, None - - if not ctp_status(mode).get("connected"): - return { - "sl_order_active": False, - "tp_order_active": False, - "needs_sl_order": sl_f is not None, - "needs_tp_order": tp_f is not None, - } - - active = ctp_list_active_orders(mode) - tick = _tick_size(ths_code) - sl_active = False - tp_active = False - if sl_f is not None: - sl_active = _find_close_order( - active, ths_code=ths_code, hold_direction=direction, price=sl_f, tick=tick, - ) is not None - if tp_f is not None: - tp_active = _find_close_order( - active, ths_code=ths_code, hold_direction=direction, price=tp_f, tick=tick, - ) is not None - return { - "sl_order_active": sl_active, - "tp_order_active": tp_active, - "needs_sl_order": sl_f is not None and not sl_active, - "needs_tp_order": tp_f is not None and not tp_active, + "sl_order_active": sl_f is not None, + "tp_order_active": tp_f is not None, + "sl_monitoring": sl_f is not None, + "tp_monitoring": tp_f is not None, + "needs_sl_order": False, + "needs_tp_order": False, } def sync_all_sl_tp_orders(conn, mode: str) -> int: - """扫描全部 active 监控,为缺失的止盈止损自动挂单。返回新挂单数。""" - ensure_monitor_order_columns(conn) - if not ctp_status(mode).get("connected"): - return 0 - reconcile_monitors_without_position(conn, mode) - placed_n = 0 - rows = conn.execute( - "SELECT * FROM trade_order_monitors WHERE status='active'" - ).fetchall() - for r in rows: - mon = dict(r) - st = monitor_order_status( - mon, mode=mode, ths_code=mon.get("symbol") or "", direction=mon.get("direction") or "long", - ) - if not st.get("needs_sl_order") and not st.get("needs_tp_order"): - continue - if mon.get("stop_loss") is None and mon.get("take_profit") is None: - continue - try: - res = place_monitor_exit_orders(conn, mon, mode=mode, force=False) - placed_n += len(res.get("placed") or []) - except Exception as exc: - logger.warning("SL/TP auto place failed monitor=%s: %s", mon.get("id"), exc) - return placed_n + """兼容旧 worker 入口:执行本地监控检查。""" + del mode + return 0 def start_sl_tp_guard_worker( @@ -436,14 +372,13 @@ def start_sl_tp_guard_worker( try: if init_tables_fn: init_tables_fn(conn) - reconcile_monitors_without_position(conn, mode) - n = sync_all_sl_tp_orders(conn, mode) + n = check_monitors_locally(conn, mode) if n: - logger.info("止盈止损守护: 新挂 %d 笔委托", n) + logger.info("止盈止损本地监控: 触发平仓 %d 笔", n) finally: conn.close() except Exception as exc: logger.warning("sl_tp_guard worker: %s", exc) - time.sleep(max(10, interval)) + time.sleep(max(3, interval)) threading.Thread(target=_loop, daemon=True, name="sl-tp-guard").start() diff --git a/static/js/market.js b/static/js/market.js index 692f218..6635280 100644 --- a/static/js/market.js +++ b/static/js/market.js @@ -430,6 +430,8 @@ name: 'K线', type: 'candlestick', data: candle, + barMaxWidth: 14, + barMinWidth: 3, itemStyle: { color: c.up, color0: c.down, diff --git a/static/js/trade.js b/static/js/trade.js index bd5c3bd..70c439b 100644 --- a/static/js/trade.js +++ b/static/js/trade.js @@ -330,7 +330,7 @@ '' ); }).join(''); - return '
- 在 .env 配置 SIMNOW_USER,于「持仓监控」连接 CTP;权益与行情优先来自柜台。
+ 保证金上限用于开仓校验与品种推荐手数(默认 30%)。在 .env 配置 SIMNOW_USER,于「持仓监控」连接 CTP;权益与行情优先来自柜台。
按权益 {{ '%.2f'|format(capital) }} 元筛选,仅显示可开 1 手的品种;参考止损/止盈按 20 跳、盈亏比 2:1 估算。 +
按权益 {{ '%.2f'|format(capital) }} 元 × 保证金上限 {{ max_margin_pct }}% 推荐手数;参考止损/止盈按 20 跳、盈亏比 2:1 估算。 {% if recommend_updated_at %}每日后台更新 · 最近 {{ recommend_updated_at }}{% else %}等待今日后台刷新…{% endif %}