diff --git a/ctp_ipc_client.py b/ctp_ipc_client.py new file mode 100644 index 0000000..49b28dc --- /dev/null +++ b/ctp_ipc_client.py @@ -0,0 +1,226 @@ +# Copyright (c) 2025-2026 马建军. All rights reserved. +# 专有软件 — 未经授权禁止复制、传播、转售。 +# 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md + +"""Local HTTP client for the isolated CTP worker process.""" +from __future__ import annotations + +import json +import os +import time +import urllib.error +import urllib.request +from typing import Any, Optional + + +DEFAULT_BASE_URL = "http://127.0.0.1:6601" +DEFAULT_TIMEOUT_SEC = 2.5 +STATUS_TIMEOUT_SEC = 5.0 +MUTATION_TIMEOUT_SEC = 8.0 + + +class CtpWorkerUnavailable(RuntimeError): + """Raised when the local CTP worker cannot be reached.""" + + +def ctp_role() -> str: + return (os.getenv("QIHUO_CTP_ROLE", "client") or "client").strip().lower() + + +def is_worker_role() -> bool: + return ctp_role() == "worker" + + +def worker_base_url() -> str: + return (os.getenv("QIHUO_CTP_WORKER_URL", DEFAULT_BASE_URL) or DEFAULT_BASE_URL).rstrip("/") + + +def worker_token() -> str: + token = (os.getenv("QIHUO_CTP_WORKER_TOKEN", "") or "").strip() + if token: + return token + # Localhost-only default keeps old deployments working; PM2 sets a shared token. + return "qihuo-local-ctp" + + +def _request( + method: str, + path: str, + payload: Optional[dict[str, Any]] = None, + *, + timeout: float = DEFAULT_TIMEOUT_SEC, +) -> dict[str, Any]: + url = f"{worker_base_url()}{path}" + body = None + headers = { + "Accept": "application/json", + "X-Qihuo-CTP-Token": worker_token(), + } + if payload is not None: + body = json.dumps(payload, ensure_ascii=False).encode("utf-8") + headers["Content-Type"] = "application/json" + req = urllib.request.Request(url, data=body, headers=headers, method=method.upper()) + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + raw = resp.read().decode("utf-8", errors="replace") + except (urllib.error.URLError, TimeoutError, OSError) as exc: + raise CtpWorkerUnavailable(f"CTP worker unavailable: {exc}") from exc + if not raw: + return {} + try: + data = json.loads(raw) + except json.JSONDecodeError as exc: + raise CtpWorkerUnavailable(f"CTP worker returned invalid JSON: {raw[:120]}") from exc + if not isinstance(data, dict): + raise CtpWorkerUnavailable("CTP worker returned non-object JSON") + if data.get("ok") is False: + raise RuntimeError(str(data.get("error") or "CTP worker request failed")) + return data + + +def get(path: str, *, timeout: float = DEFAULT_TIMEOUT_SEC) -> dict[str, Any]: + return _request("GET", path, timeout=timeout) + + +def post( + path: str, + payload: Optional[dict[str, Any]] = None, + *, + timeout: float = DEFAULT_TIMEOUT_SEC, +) -> dict[str, Any]: + return _request("POST", path, payload or {}, timeout=timeout) + + +def health() -> dict[str, Any]: + try: + return get("/health", timeout=1.0) + except Exception as exc: + return { + "ok": False, + "worker_online": False, + "error": str(exc), + "ts": time.time(), + } + + +def status(mode: str) -> dict[str, Any]: + try: + data = get(f"/ctp/status?mode={mode}", timeout=STATUS_TIMEOUT_SEC) + return dict(data.get("status") or {}) + except Exception as exc: + return { + "connected": False, + "connecting": False, + "worker_online": False, + "last_error": f"CTP worker 离线或重启中:{exc}", + } + + +def connect(mode: str, *, force: bool = False) -> dict[str, Any]: + data = post( + "/ctp/connect", + {"mode": mode, "force": bool(force)}, + timeout=MUTATION_TIMEOUT_SEC, + ) + return dict(data.get("status") or data) + + +def start_connect(mode: str, *, force: bool = False, scheduled: bool = False) -> dict[str, Any]: + return post( + "/ctp/start_connect", + {"mode": mode, "force": bool(force), "scheduled": bool(scheduled)}, + timeout=MUTATION_TIMEOUT_SEC, + ) + + +def disconnect(*, set_disabled_hint: bool = False) -> None: + post( + "/ctp/disconnect", + {"set_disabled_hint": bool(set_disabled_hint)}, + timeout=MUTATION_TIMEOUT_SEC, + ) + + +def account(mode: str) -> dict[str, Any]: + data = get(f"/ctp/account?mode={mode}") + return dict(data.get("account") or {}) + + +def positions( + mode: str, + *, + refresh_if_empty: bool = True, + refresh_margin: bool = False, +) -> list[dict[str, Any]]: + data = post( + "/ctp/positions", + { + "mode": mode, + "refresh_if_empty": bool(refresh_if_empty), + "refresh_margin": bool(refresh_margin), + }, + ) + return list(data.get("positions") or []) + + +def trades(mode: str, *, refresh: bool = False) -> list[dict[str, Any]]: + data = post("/ctp/trades", {"mode": mode, "refresh": bool(refresh)}) + return list(data.get("trades") or []) + + +def active_orders(mode: str) -> list[dict[str, Any]]: + data = get(f"/ctp/active_orders?mode={mode}") + return list(data.get("orders") or []) + + +def tick_price(mode: str, symbol: str) -> Optional[float]: + data = post("/ctp/tick_price", {"mode": mode, "symbol": symbol}) + value = data.get("price") + return float(value) if value not in (None, "") else None + + +def tick_detail(mode: str, symbol: str) -> dict[str, Any]: + data = post("/ctp/tick_detail", {"mode": mode, "symbol": symbol}) + return dict(data.get("detail") or {}) + + +def estimate_margin_one_lot( + mode: str, + symbol: str, + price: float, + *, + direction: str = "long", +) -> Optional[float]: + data = post( + "/ctp/estimate_margin_one_lot", + {"mode": mode, "symbol": symbol, "price": price, "direction": direction}, + ) + value = data.get("margin") + return float(value) if value not in (None, "") else None + + +def contract_spec(mode: str, symbol: str) -> Optional[dict[str, Any]]: + data = post("/ctp/contract_spec", {"mode": mode, "symbol": symbol}) + spec = data.get("spec") + return dict(spec) if isinstance(spec, dict) else None + + +def send_order(payload: dict[str, Any]) -> dict[str, Any]: + return post("/ctp/order", payload, timeout=MUTATION_TIMEOUT_SEC) + + +def cancel_order(mode: str, vt_orderid: str) -> bool: + data = post( + "/ctp/cancel", + {"mode": mode, "vt_orderid": vt_orderid}, + timeout=MUTATION_TIMEOUT_SEC, + ) + return bool(data.get("cancelled")) + + +def bridge_action(action: str, payload: Optional[dict[str, Any]] = None) -> dict[str, Any]: + return post( + f"/ctp/bridge/{action}", + payload or {}, + timeout=MUTATION_TIMEOUT_SEC, + ) diff --git a/ctp_worker.py b/ctp_worker.py new file mode 100644 index 0000000..f093f87 --- /dev/null +++ b/ctp_worker.py @@ -0,0 +1,494 @@ +# Copyright (c) 2025-2026 马建军. All rights reserved. +# 专有软件 — 未经授权禁止复制、传播、转售。 +# 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md + +"""Isolated local CTP worker. + +This process is the only process that should instantiate vn.py / vnpy_ctp. +The Flask web app talks to it through localhost HTTP via ctp_ipc_client.py. +""" +from __future__ import annotations + +import logging +import os +import threading +import time +from typing import Any + +os.environ.setdefault("QIHUO_CTP_ROLE", "worker") + +from flask import Flask, jsonify, request + +from ctp_ipc_client import worker_token +from db_conn import DB_PATH, commit_retry, connect_db +from fee_specs import get_setting, set_setting +from locale_fix import ensure_process_locale +from market_sessions import is_trading_session +from sl_tp_guard import check_sl_tp_on_tick, ensure_monitor_order_columns, start_sl_tp_guard_worker +from strategy.strategy_db import init_strategy_tables +from trading_context import get_account_capital, get_trading_mode, get_trailing_be_tick_buffer +from vnpy_bridge import ( + _ctp_td_lock, + ctp_cancel_order, + ctp_disconnect, + ctp_estimate_margin_one_lot, + ctp_get_account, + ctp_get_tick_detail, + ctp_get_tick_price, + ctp_list_active_orders, + ctp_list_positions, + ctp_list_trades, + ctp_lookup_contract_spec, + ctp_start_connect, + ctp_status, + ctp_try_auto_reconnect, + execute_order, + get_bridge, + set_ctp_connected_callback, + set_position_refresh_callback, + set_tick_quote_callback, + set_tick_sl_tp_callback, + try_init_vnpy, +) + + +logging.basicConfig( + level=os.getenv("LOG_LEVEL", "INFO"), + format="%(asctime)s %(levelname)s [%(name)s] %(message)s", +) +logger = logging.getLogger(__name__) + +app = Flask(__name__) +_started_workers = False +_last_snapshot_ts = 0.0 +_snapshot_lock = threading.Lock() + + +def _json_ok(**payload: Any): + return jsonify({"ok": True, **payload}) + + +def _json_error(exc: Exception, *, status_code: int = 500): + return jsonify({"ok": False, "error": str(exc)}), status_code + + +def _require_token() -> None: + expected = worker_token() + got = request.headers.get("X-Qihuo-CTP-Token", "") + if expected and got != expected: + raise PermissionError("unauthorized") + + +@app.before_request +def _auth(): + _require_token() + + +@app.errorhandler(Exception) +def _handle_error(exc: Exception): + code = 401 if isinstance(exc, PermissionError) else 500 + logger.warning("ctp worker request failed: %s", exc) + return _json_error(exc, status_code=code) + + +def _mode_from_request() -> str: + data = request.get_json(silent=True) or {} + return ( + data.get("mode") + or request.args.get("mode") + or get_trading_mode(get_setting) + or "simulation" + ) + + +def _fast_status(mode: str) -> dict[str, Any]: + """Return worker/native bridge state without slow network probing.""" + from ctp_settings import CTP_DISABLED_HINT, is_ctp_auto_connect_enabled + + try: + st = dict(get_bridge().status(mode) or {}) + except Exception as exc: + st = { + "connected": False, + "connecting": False, + "connected_mode": None, + "last_error": str(exc), + "mode_label": "SimNow" if mode == "simulation" else "期货公司实盘", + } + auto = is_ctp_auto_connect_enabled() + st["auto_connect_enabled"] = auto + st["worker_online"] = True + if not auto: + st["disabled_hint"] = CTP_DISABLED_HINT + if not st.get("connected") and not st.get("connecting"): + st["last_error"] = "" + st["td_reachable"] = None + return st + + +def _send_wechat_msg(content: str) -> None: + webhook = get_setting("wechat_webhook", "") + if not webhook: + return + try: + import requests + + requests.post( + webhook, + json={"msgtype": "text", "text": {"content": f"【国内期货】\n{content}"}}, + timeout=10, + ) + except Exception as exc: + logger.debug("wechat notify failed: %s", exc) + + +def _init_worker_tables(conn) -> None: + init_strategy_tables(conn) + ensure_monitor_order_columns(conn) + + +def _capital(conn) -> float: + try: + return float(get_account_capital(get_setting, conn=conn) or 0) + except Exception: + return 0.0 + + +def _persist_snapshot(mode: str) -> None: + global _last_snapshot_ts + with _snapshot_lock: + now = time.time() + if now - _last_snapshot_ts < 0.25: + return + _last_snapshot_ts = now + try: + import json + + st = _fast_status(mode) + positions = ctp_list_positions(mode, refresh_if_empty=False, refresh_margin=False) + account = ctp_get_account(mode) if st.get("connected") else {} + conn = connect_db(DB_PATH) + try: + conn.execute( + """CREATE TABLE IF NOT EXISTS ctp_worker_snapshots ( + key TEXT PRIMARY KEY, + value TEXT, + updated_at REAL + )""" + ) + for key, value in ( + ("status", st), + ("positions", positions), + ("account", account), + ): + conn.execute( + """INSERT INTO ctp_worker_snapshots(key, value, updated_at) + VALUES(?,?,?) + ON CONFLICT(key) DO UPDATE SET + value=excluded.value, + updated_at=excluded.updated_at""", + (key, json.dumps(value, ensure_ascii=False), now), + ) + commit_retry(conn) + finally: + conn.close() + except Exception as exc: + logger.debug("persist ctp snapshot: %s", exc) + + +def _on_position_refresh() -> None: + try: + _persist_snapshot(get_trading_mode(get_setting)) + except Exception as exc: + logger.debug("position refresh callback: %s", exc) + + +def _on_tick_quote() -> None: + _on_position_refresh() + + +def _on_tick_sl_tp(exchange: str, symbol: str, price: float) -> None: + mode = get_trading_mode(get_setting) + if not ctp_status(mode).get("connected"): + return + conn = connect_db(DB_PATH) + try: + _init_worker_tables(conn) + capital = _capital(conn) + n = check_sl_tp_on_tick( + conn, + mode, + exchange, + symbol, + price, + capital=capital, + notify_fn=_send_wechat_msg, + be_tick_mult=get_trailing_be_tick_buffer(get_setting), + ) + if n: + commit_retry(conn) + _persist_snapshot(mode) + except Exception as exc: + logger.warning("worker tick sl/tp: %s", exc) + finally: + conn.close() + + +def _on_ctp_connected(mode: str) -> None: + try: + with _ctp_td_lock: + get_bridge().request_position_snapshot(force=True) + get_bridge().calibrate_trading_state() + _persist_snapshot(mode) + except Exception as exc: + logger.debug("worker ctp connected callback: %s", exc) + + +def _start_background_workers() -> None: + global _started_workers + if _started_workers: + return + _started_workers = True + + set_position_refresh_callback(_on_position_refresh) + set_tick_quote_callback(_on_tick_quote) + set_tick_sl_tp_callback(_on_tick_sl_tp) + set_ctp_connected_callback(_on_ctp_connected) + + from ctp_fee_worker import start_ctp_fee_worker + from ctp_premarket_connect import start_ctp_premarket_connect_worker + from ctp_reconnect import start_ctp_reconnect_worker + from order_pending import reconcile_pending_orders + from pending_order_worker import start_pending_order_worker + + def _mode() -> str: + return get_trading_mode(get_setting) + + start_ctp_reconnect_worker(get_mode_fn=_mode, get_setting_fn=get_setting) + start_ctp_premarket_connect_worker(get_mode_fn=_mode, get_setting_fn=get_setting) + start_ctp_fee_worker( + get_mode_fn=_mode, + get_setting_fn=get_setting, + set_setting_fn=set_setting, + ) + start_pending_order_worker( + db_path=DB_PATH, + get_mode_fn=_mode, + init_tables_fn=_init_worker_tables, + get_capital_fn=_capital, + reconcile_fn=reconcile_pending_orders, + on_changed_fn=lambda: _persist_snapshot(_mode()), + ) + start_sl_tp_guard_worker( + db_path=DB_PATH, + get_mode_fn=_mode, + init_tables_fn=_init_worker_tables, + get_capital_fn=_capital, + get_be_tick_buffer_fn=lambda: get_trailing_be_tick_buffer(get_setting), + notify_fn=_send_wechat_msg, + ) + + def _snapshot_loop() -> None: + time.sleep(3) + while True: + try: + mode = _mode() + if _fast_status(mode).get("connected"): + _persist_snapshot(mode) + except Exception as exc: + logger.debug("worker snapshot loop: %s", exc) + time.sleep(2 if is_trading_session() else 15) + + threading.Thread(target=_snapshot_loop, daemon=True, name="ctp-worker-snapshot").start() + + +@app.route("/health") +def health(): + mode = request.args.get("mode") or get_trading_mode(get_setting) + st = _fast_status(mode) + return _json_ok( + worker_online=True, + role=os.getenv("QIHUO_CTP_ROLE", "worker"), + mode=mode, + status=st, + ts=time.time(), + ) + + +@app.route("/ctp/status") +def api_status(): + mode = _mode_from_request() + return _json_ok(status=_fast_status(mode)) + + +@app.route("/ctp/connect", methods=["POST"]) +def api_connect(): + data = request.get_json(silent=True) or {} + mode = data.get("mode") or get_trading_mode(get_setting) + info = ctp_start_connect(mode, force=bool(data.get("force"))) + st = info.get("status") or _fast_status(mode) + return _json_ok(status=st, **{k: v for k, v in info.items() if k != "status"}) + + +@app.route("/ctp/start_connect", methods=["POST"]) +def api_start_connect(): + data = request.get_json(silent=True) or {} + mode = data.get("mode") or get_trading_mode(get_setting) + return _json_ok(**ctp_start_connect( + mode, + force=bool(data.get("force")), + scheduled=bool(data.get("scheduled")), + )) + + +@app.route("/ctp/disconnect", methods=["POST"]) +def api_disconnect(): + data = request.get_json(silent=True) or {} + ctp_disconnect(set_disabled_hint=bool(data.get("set_disabled_hint"))) + return _json_ok(disconnected=True) + + +@app.route("/ctp/account") +def api_account(): + mode = _mode_from_request() + if not _fast_status(mode).get("connected"): + return _json_ok(account={}) + return _json_ok(account=ctp_get_account(mode)) + + +@app.route("/ctp/positions", methods=["POST"]) +def api_positions(): + data = request.get_json(silent=True) or {} + mode = data.get("mode") or get_trading_mode(get_setting) + return _json_ok(positions=ctp_list_positions( + mode, + refresh_if_empty=bool(data.get("refresh_if_empty", True)), + refresh_margin=bool(data.get("refresh_margin", False)), + )) + + +@app.route("/ctp/trades", methods=["POST"]) +def api_trades(): + data = request.get_json(silent=True) or {} + mode = data.get("mode") or get_trading_mode(get_setting) + return _json_ok(trades=ctp_list_trades(mode, refresh=bool(data.get("refresh")))) + + +@app.route("/ctp/active_orders") +def api_active_orders(): + mode = _mode_from_request() + return _json_ok(orders=ctp_list_active_orders(mode)) + + +@app.route("/ctp/tick_price", methods=["POST"]) +def api_tick_price(): + data = request.get_json(silent=True) or {} + return _json_ok(price=ctp_get_tick_price( + data.get("mode") or get_trading_mode(get_setting), + data.get("symbol") or "", + )) + + +@app.route("/ctp/tick_detail", methods=["POST"]) +def api_tick_detail(): + data = request.get_json(silent=True) or {} + return _json_ok(detail=ctp_get_tick_detail( + data.get("mode") or get_trading_mode(get_setting), + data.get("symbol") or "", + )) + + +@app.route("/ctp/estimate_margin_one_lot", methods=["POST"]) +def api_estimate_margin(): + data = request.get_json(silent=True) or {} + return _json_ok(margin=ctp_estimate_margin_one_lot( + data.get("mode") or get_trading_mode(get_setting), + data.get("symbol") or "", + float(data.get("price") or 0), + direction=data.get("direction") or "long", + )) + + +@app.route("/ctp/contract_spec", methods=["POST"]) +def api_contract_spec(): + data = request.get_json(silent=True) or {} + return _json_ok(spec=ctp_lookup_contract_spec( + data.get("mode") or get_trading_mode(get_setting), + data.get("symbol") or "", + )) + + +@app.route("/ctp/order", methods=["POST"]) +def api_order(): + data = request.get_json(silent=True) or {} + mode = data.get("mode") or get_trading_mode(get_setting) + result = execute_order( + None, + mode=mode, + offset=data.get("offset") or "open", + symbol=data.get("symbol") or "", + direction=data.get("direction") or "long", + lots=int(data.get("lots") or 1), + price=float(data.get("price") or 0), + settings=data.get("settings") or {}, + order_type=data.get("order_type") or "limit", + ) + _persist_snapshot(mode) + return _json_ok(**result) + + +@app.route("/ctp/cancel", methods=["POST"]) +def api_cancel(): + data = request.get_json(silent=True) or {} + mode = data.get("mode") or get_trading_mode(get_setting) + cancelled = ctp_cancel_order(mode, data.get("vt_orderid") or "") + _persist_snapshot(mode) + return _json_ok(cancelled=cancelled) + + +@app.route("/ctp/bridge/", methods=["POST"]) +def api_bridge_action(action: str): + data = request.get_json(silent=True) or {} + b = get_bridge() + if action == "calibrate_trading_state": + return _json_ok(result=b.calibrate_trading_state()) + if action == "request_position_snapshot": + return _json_ok(result=b.request_position_snapshot(force=bool(data.get("force")))) + if action == "subscribe_symbol": + return _json_ok(result=b.subscribe_symbol(data.get("symbol") or "")) + if action == "refresh_positions": + return _json_ok(result=b.refresh_positions()) + if action == "connect_in_progress": + return _json_ok(result=b.connect_in_progress()) + if action == "reconnect_after_settings_saved": + mode = data.get("mode") or get_trading_mode(get_setting) + return _json_ok(result=b.reconnect_after_settings_saved(mode)) + if action == "query_all_commissions": + return _json_ok(result=b.query_all_commissions( + mode=data.get("mode") or get_trading_mode(get_setting), + )) + if action == "query_instrument_commission": + return _json_ok(result=b.query_instrument_commission( + data.get("symbol") or "", + mode=data.get("mode") or get_trading_mode(get_setting), + )) + if action == "get_kline_bars_1m": + return _json_ok(result=b.get_kline_bars_1m( + data.get("symbol") or "", + mode=data.get("mode") or get_trading_mode(get_setting), + )) + return _json_error(ValueError(f"unsupported bridge action: {action}"), status_code=404) + + +def main() -> None: + ensure_process_locale() + try_init_vnpy({}) + _start_background_workers() + host = os.getenv("QIHUO_CTP_WORKER_HOST", "127.0.0.1") + port = int(os.getenv("QIHUO_CTP_WORKER_PORT", "6601") or 6601) + logger.info("starting qihuo-ctp worker on %s:%s", host, port) + app.run(host=host, port=port, debug=False, threaded=True, use_reloader=False) + + +if __name__ == "__main__": + main() diff --git a/dashboard_lib.py b/dashboard_lib.py index 2c2178d..c3ec2e1 100644 --- a/dashboard_lib.py +++ b/dashboard_lib.py @@ -175,6 +175,18 @@ def build_dashboard_payload( margin_used = round(max(0.0, equity - available), 2) except Exception: pass + else: + from trading_context import _cached_ctp_account + + cached = _cached_ctp_account(mode) + balance = float(cached.get("balance") or 0) + if balance > 0: + equity = balance + avail = cached.get("available") + if avail is not None: + available = round(float(avail), 2) + if equity > 0: + margin_used = round(max(0.0, equity - available), 2) key_rows = conn.execute( """ diff --git a/ecosystem.config.cjs b/ecosystem.config.cjs index b270af3..24e8545 100644 --- a/ecosystem.config.cjs +++ b/ecosystem.config.cjs @@ -24,6 +24,9 @@ module.exports = { LANG: "zh_CN.UTF-8", LC_ALL: "zh_CN.UTF-8", LC_CTYPE: "zh_CN.UTF-8", + QIHUO_CTP_ROLE: "client", + QIHUO_CTP_WORKER_URL: "http://127.0.0.1:6601", + QIHUO_CTP_WORKER_TOKEN: "qihuo-local-ctp", QIHUO_STARTUP_WORKERS: "8", QIHUO_MEMORY_MB: "8192", }, @@ -31,5 +34,29 @@ module.exports = { out_file: path.join(ROOT, "logs", "pm2-out.log"), time: true, }, + { + name: "qihuo-ctp", + script: "ctp_worker.py", + cwd: ROOT, + interpreter, + instances: 1, + autorestart: true, + watch: false, + max_memory_restart: "8192M", + env: { + NODE_ENV: "production", + LANG: "zh_CN.UTF-8", + LC_ALL: "zh_CN.UTF-8", + LC_CTYPE: "zh_CN.UTF-8", + QIHUO_CTP_ROLE: "worker", + QIHUO_CTP_WORKER_HOST: "127.0.0.1", + QIHUO_CTP_WORKER_PORT: "6601", + QIHUO_CTP_WORKER_TOKEN: "qihuo-local-ctp", + QIHUO_MEMORY_MB: "8192", + }, + error_file: path.join(ROOT, "logs", "pm2-ctp-error.log"), + out_file: path.join(ROOT, "logs", "pm2-ctp-out.log"), + time: true, + }, ], }; diff --git a/install_trading.py b/install_trading.py index e70ee58..44346bd 100644 --- a/install_trading.py +++ b/install_trading.py @@ -80,6 +80,7 @@ from strategy.strategy_roll_lib import ( ADD_MODE_BREAKOUT, ADD_MODE_MARKET, FIB_MODES, + LEG_STATUS_CANCELLED, LEG_STATUS_FILLED, LEG_STATUS_PENDING, PENDING_MODES, @@ -152,6 +153,7 @@ logger = logging.getLogger(__name__) def install_trading(app, *, login_required, require_nav, get_db, get_setting, set_setting, fetch_price, send_wechat_msg): """注册交易相关路由。""" _nav = require_nav + _live_refresh_lock = threading.Lock() def _sizing_mode_label(mode: str) -> str: m = normalize_sizing_mode(mode) @@ -447,11 +449,90 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se except Exception as exc: logger.debug("persist monitor ctp snapshot %s: %s", mid, exc) - def _ensure_monitors_from_ctp(conn, mode: str) -> None: + def _positions_from_live_snapshot() -> list[dict]: + snap = position_hub.get_snapshot() or {} + out: list[dict] = [] + for row in snap.get("rows") or []: + lots = int(row.get("lots") or 0) + if lots <= 0 or row.get("order_state") == "pending": + continue + sym = ( + row.get("symbol_code") + or row.get("ths_code") + or row.get("symbol") + or "" + ) + if not sym: + continue + out.append({ + "symbol": sym, + "direction": row.get("direction") or "long", + "lots": lots, + "avg_price": row.get("entry_price") or row.get("avg_price") or 0, + "open_time": row.get("open_time") or "", + "margin": row.get("margin"), + "pnl": row.get("float_pnl"), + "mark_price": row.get("mark_price") or row.get("current_price"), + "exchange": row.get("exchange") or "", + }) + return out + + def _positions_for_monitor_restore(mode: str, *, allow_ctp: bool = True) -> list[dict]: + if allow_ctp: + positions = list(_ctp_positions(mode, refresh_if_empty=True) or []) + if positions: + return positions + positions = list(trading_state.get_positions() or []) + if positions: + return positions + positions = _positions_from_live_snapshot() + if not allow_ctp: + return positions + margin_used = float(ctp_account_margin_used(mode) or 0) + if margin_used <= 100 or not positions: + return [] + return positions + + def _cached_position_mark(sym: str, direction: str = "") -> Optional[float]: + sym_l = (sym or "").strip().lower() + direction_l = (direction or "").strip().lower() + for p in list(trading_state.get_positions() or []) + _positions_from_live_snapshot(): + if direction_l and (p.get("direction") or "long").strip().lower() != direction_l: + continue + ps = (p.get("symbol") or "").strip() + if not ps: + continue + if not _match_ctp_symbol(ps, sym_l): + continue + for key in ("mark_price", "current_price", "last_price"): + val = p.get(key) + try: + px = float(val or 0) + except (TypeError, ValueError): + px = 0.0 + if px > 0: + return px + snap = position_hub.get_snapshot() or {} + for row in snap.get("rows") or []: + rs = row.get("symbol_code") or row.get("symbol") or "" + if not rs or not _match_ctp_symbol(rs, sym_l): + continue + if direction_l and (row.get("direction") or "long").strip().lower() != direction_l: + continue + for key in ("mark_price", "current_price", "last_price", "entry_price"): + try: + px = float(row.get(key) or 0) + except (TypeError, ValueError): + px = 0.0 + if px > 0: + return px + return None + + def _ensure_monitors_from_ctp(conn, mode: str, *, allow_ctp: bool = True) -> None: """CTP 有持仓但本地无监控时,自动补写一条 active 记录供展示。""" if not ctp_status(mode).get("connected"): return - ctp_positions = _ctp_positions(mode, refresh_if_empty=True) + ctp_positions = _positions_for_monitor_restore(mode, allow_ctp=allow_ctp) for p in ctp_positions: lots = int(p.get("lots") or 0) if lots <= 0: @@ -615,13 +696,18 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se direction: str, mode: str, fallback: float = 0.0, + *, + allow_ctp: bool = False, ) -> float: """滚仓/展示用均价:仅柜台持仓价。""" if not ctp_status(mode).get("connected"): return fallback - for p in trading_state.get_positions() or _ctp_positions( - mode, refresh_if_empty=False, - ): + positions = list(trading_state.get_positions() or []) + if not positions: + positions = _positions_from_live_snapshot() + if not positions and allow_ctp: + positions = _ctp_positions(mode, refresh_if_empty=False) + for p in positions: if (p.get("direction") or "long") != (direction or "long"): continue if not _match_ctp_symbol(p.get("symbol") or "", sym): @@ -767,9 +853,25 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se return reconcile_monitors_without_position(conn, mode) def _effective_active_position_count(conn, mode: str) -> int: - """风控持仓数以本地 active 监控为准,不随 CTP 内存空窗抖动。""" - del mode - return count_active_trade_monitors(conn) + """风控持仓数以柜台/快照实际持仓优先,本地监控作兜底。""" + monitor_count = count_active_trade_monitors(conn) + if not ctp_status(mode).get("connected"): + return monitor_count + keys: set[tuple[str, str]] = set() + for p in _positions_for_monitor_restore(mode, allow_ctp=False): + lots = int(p.get("lots") or 0) + if lots <= 0: + continue + sym = ( + p.get("symbol") + or p.get("symbol_code") + or p.get("ths_code") + or "" + ).strip().lower() + direction = (p.get("direction") or "long").strip().lower() + if sym: + keys.add((sym, direction)) + return max(monitor_count, len(keys)) def _build_pending_orders(conn, mode: str) -> list[dict]: pending: list[dict] = [] @@ -944,7 +1046,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se if not mon: rsl, rtp, rtrail, rinitial = _restore_sl_tp_from_closed(conn, sym, direction) if rsl is None and rtp is None and not rtrail: - return None + return {"symbol": sym, "direction": direction} return { "symbol": sym, "direction": direction, @@ -1727,14 +1829,20 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se now_iso = datetime.now(tz).strftime("%Y-%m-%dT%H:%M") mode = get_trading_mode(get_setting) capital = _capital(conn) - ensure_monitor_order_columns(conn) - monitor_by_pk = _monitors_by_position_key(conn) ctp_list: list[dict] = [] if ctp_status(mode).get("connected"): - ctp_list = _ctp_positions(mode, refresh_if_empty=False, refresh_margin=False) - if not ctp_list: - ctp_list = trading_state.get_positions() + merged: dict[str, dict] = {} + for p in list(_ctp_positions(mode) or []) + list(trading_state.get_positions() or []): + lots = int(p.get("lots") or 0) + if lots <= 0: + continue + pk = p.get("position_key") or _position_key_from_ctp(p) + merged[pk] = p + ctp_list = list(merged.values()) + + ensure_monitor_order_columns(conn) + monitor_by_pk = _monitors_by_position_key(conn) rows: list[dict] = [] for p in ctp_list: @@ -1890,7 +1998,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se mode = get_trading_mode(get_setting) ctp_st = ctp_status(mode) capital = _capital(conn) - if ctp_st.get("connected") and (not fast or _has_pending_monitors(conn)): + if ctp_st.get("connected") and not fast: _reconcile_pending(conn, mode, capital=capital) if ctp_st.get("connected"): if not fast: @@ -1900,6 +2008,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se margin_raw = ctp_account_margin_used(mode) if margin_raw is not None and float(margin_raw) > 0: _ensure_monitors_from_sticky_state(conn, mode) + if not fast: + _close_stale_roll_groups(conn) rows = _build_trading_live_rows(conn, fast=fast) active_orders = _build_active_orders( conn, mode=mode, capital=capital, now_iso=now_iso, @@ -1916,6 +2026,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se margin_used = ( ctp_account_margin_used(mode) if ctp_st.get("connected") else None ) + display_sync_state = "ready" if rows else trading_state.sync_state + display_sync_label = "已同步" if rows else trading_state.sync_label() return { "ok": True, "rows": rows, @@ -1930,62 +2042,157 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se "night_session": is_night_trading_session(), "session_clock": trading_session_clock(), "pending_order_timeout_min": get_pending_order_timeout_min(get_setting), + "sync_state": display_sync_state, + "sync_label": display_sync_label, + } + + def _minimal_live_payload(conn) -> dict: + """CTP 直出兜底:不跑对账/写库,避免与后台 worker 争锁。""" + from zoneinfo import ZoneInfo + + tz = ZoneInfo("Asia/Shanghai") + now_iso = datetime.now(tz).strftime("%Y-%m-%dT%H:%M") + mode = get_trading_mode(get_setting) + ctp_st = ctp_status(mode) + capital = _capital(conn) + rows: list[dict] = [] + if ctp_st.get("connected"): + for p in _ctp_positions(mode, refresh_if_empty=False): + lots = int(p.get("lots") or 0) + if lots <= 0: + continue + ths = _ctp_pos_to_ths_code(p) or (p.get("symbol") or "") + direction = p.get("direction") or "long" + mon = {"symbol": ths, "direction": direction} + try: + row = _compose_position_row( + conn, + mon=mon, + ctp=p, + mode=mode, + capital=capital, + now_iso=now_iso, + fast=True, + ) + if row: + rows.append(row) + except Exception as exc: + logger.warning("minimal live row failed: %s", exc) + risk = get_risk_status( + conn, + active_count=_effective_active_position_count(conn, mode), + equity=capital, + ) + return { + "ok": True, + "rows": rows, + "active_orders": [], + "pending_orders": [], + "capital": capital, + "ctp_status": ctp_st, + "trading_mode_label": trading_mode_label(get_setting), + "risk_status": risk, + "trading_session": is_trading_session(), + "night_session": is_night_trading_session(), + "session_clock": trading_session_clock(), + "pending_order_timeout_min": get_pending_order_timeout_min(get_setting), "sync_state": trading_state.sync_state, "sync_label": trading_state.sync_label(), } + def _normalize_live_payload(payload: dict) -> dict: + if payload.get("rows"): + payload = dict(payload) + payload["sync_state"] = "ready" + payload["sync_label"] = "已同步" + return payload + def _refresh_trading_live_snapshot(*, fast: bool = False) -> dict: - mode = get_trading_mode(get_setting) - if ctp_status(mode).get("connected") and not fast: + def _build() -> dict: + mode = get_trading_mode(get_setting) + if ctp_status(mode).get("connected") and not fast: + try: + with _ctp_td_lock: + get_bridge().calibrate_trading_state() + except Exception as exc: + logger.debug("refresh calibrate: %s", exc) + for p in trading_state.get_positions() or _ctp_positions(mode, refresh_if_empty=False): + ths = _ctp_pos_to_ths_code(p) + if ths: + try: + get_bridge().subscribe_symbol(ths) + except Exception: + pass + conn = get_db() try: - with _ctp_td_lock: - get_bridge().calibrate_trading_state() - except Exception as exc: - logger.debug("refresh calibrate: %s", exc) - for p in trading_state.get_positions() or _ctp_positions(mode, refresh_if_empty=False): - ths = _ctp_pos_to_ths_code(p) - if ths: - try: - get_bridge().subscribe_symbol(ths) - except Exception: - pass - conn = get_db() - try: - init_strategy_tables(conn) - payload = _build_trading_live_payload(conn, fast=fast) - commit_retry(conn) - prev = position_hub.get_snapshot() - active_n = int((payload.get("risk_status") or {}).get("active_count") or 0) - if ( - prev - and ctp_status(mode).get("connected") - and not (payload.get("rows") or []) - and (prev.get("rows") or []) - ): - margin_raw = payload.get("margin_used") - if margin_raw is None: - margin_raw = ctp_account_margin_used(mode) - margin_used = float(margin_raw or 0) if margin_raw is not None else 0.0 + init_strategy_tables(conn) + if not fast: + ensure_monitor_order_columns(conn, migrate=True) + payload = _build_trading_live_payload(conn, fast=fast) + commit_retry(conn) + prev = position_hub.get_snapshot() + active_n = int((payload.get("risk_status") or {}).get("active_count") or 0) if ( - (margin_raw is not None and margin_used > 0) - or trading_state.sync_state == "syncing" - or active_n > 0 + prev + and ctp_status(mode).get("connected") + and not (payload.get("rows") or []) + and (prev.get("rows") or []) + ): + margin_raw = payload.get("margin_used") + if margin_raw is None: + margin_raw = ctp_account_margin_used(mode) + margin_used_val = float(margin_raw or 0) if margin_raw is not None else 0.0 + if ( + (margin_raw is not None and margin_used_val > 0) + or trading_state.sync_state == "syncing" + or active_n > 0 + ): + payload = dict(payload) + payload["rows"] = prev["rows"] + if trading_state.sync_state == "syncing": + payload["sync_state"] = "syncing" + payload["sync_label"] = "同步中…" + elif ( + ctp_status(mode).get("connected") + and not (payload.get("rows") or []) + and active_n > 0 ): payload = dict(payload) - payload["rows"] = prev["rows"] - if trading_state.sync_state == "syncing": + payload["rows"] = _build_trading_live_rows(conn, fast=fast) + elif ctp_status(mode).get("connected") and not (payload.get("rows") or []): + since_connect = time.time() - float( + getattr(get_bridge(), "_last_connect_ok_ts", 0) or 0, + ) + if since_connect < 180: + payload = dict(payload) payload["sync_state"] = "syncing" - payload["sync_label"] = "同步中…" - elif ( - ctp_status(mode).get("connected") - and not (payload.get("rows") or []) - and active_n > 0 - ): - payload = dict(payload) - payload["rows"] = _build_trading_live_rows(conn, fast=fast) - return payload - finally: - conn.close() + payload["sync_label"] = "持仓同步中…" + return _normalize_live_payload(payload) + finally: + conn.close() + + if fast: + if _live_refresh_lock.acquire(blocking=False): + try: + return _build() + finally: + _live_refresh_lock.release() + snap = position_hub.get_snapshot() + if snap: + return snap + if _live_refresh_lock.acquire(timeout=2.0): + try: + return _build() + finally: + _live_refresh_lock.release() + conn = get_db() + try: + init_strategy_tables(conn) + return _minimal_live_payload(conn) + finally: + conn.close() + with _live_refresh_lock: + return _build() def _push_position_snapshot_async(*, fast: bool = True) -> None: def _run() -> None: @@ -2115,13 +2322,31 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se def _warm() -> None: try: - mode = get_trading_mode(get_setting) - if ctp_status(mode).get("connected"): - with _ctp_td_lock: - get_bridge().calibrate_trading_state() - payload = _refresh_trading_live_snapshot(fast=False) + payload = _refresh_trading_live_snapshot(fast=True) position_hub.set_snapshot(payload) position_hub.broadcast("positions", payload) + mode = get_trading_mode(get_setting) + if ctp_status(mode).get("connected"): + try: + with _ctp_td_lock: + get_bridge().calibrate_trading_state() + get_bridge().request_position_snapshot(force=True) + except Exception as exc: + logger.debug("bootstrap calibrate: %s", exc) + payload = _refresh_trading_live_snapshot(fast=True) + position_hub.set_snapshot(payload) + position_hub.broadcast("positions", payload) + + def _slow_sync() -> None: + time.sleep(20) + try: + pl = _refresh_trading_live_snapshot(fast=False) + position_hub.set_snapshot(pl) + position_hub.broadcast("positions", pl) + except Exception as exc: + logger.warning("bootstrap slow sync: %s", exc) + + threading.Thread(target=_slow_sync, daemon=True, name="boot-slow-sync").start() except Exception as exc: logger.warning("bootstrap position snapshot: %s", exc) @@ -2147,14 +2372,17 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se if mode != get_trading_mode(get_setting): return _schedule_recommend_refresh() + _push_position_snapshot_async(fast=True) def _after_connect() -> None: try: try: with _ctp_td_lock: + get_bridge().request_position_snapshot(force=True) get_bridge().calibrate_trading_state() except Exception as exc: logger.debug("ctp connected calibrate: %s", exc) + _push_position_snapshot_async(fast=True) conn = get_db() try: init_strategy_tables(conn) @@ -2273,7 +2501,12 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se @app.route("/api/trading/live") @login_required def api_trading_live(): + snap = position_hub.get_snapshot() + if snap: + return jsonify(_normalize_live_payload(snap)) payload = _refresh_trading_live_snapshot(fast=True) + payload = _normalize_live_payload(payload) + position_hub.set_snapshot(payload) return jsonify(payload) @app.route("/api/trading/stream") @@ -2612,7 +2845,72 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se def _roll_ui_modes(): return frozenset({ADD_MODE_MARKET, ADD_MODE_BREAKOUT}) - def _enrich_roll_group_row(row: dict) -> dict: + def _cached_ctp_status(mode: str) -> dict: + """页面渲染优先读持仓快照里的 CTP 状态,避免每次打 worker IPC。""" + try: + snap = position_hub.get_snapshot() or {} + st = snap.get("ctp_status") + if isinstance(st, dict) and st: + return dict(st) + except Exception: + pass + return dict(ctp_status(mode) or {}) + + def _roll_filled_lots_map(conn, group_ids: list[int]) -> dict[int, int]: + if not group_ids: + return {} + placeholders = ",".join("?" * len(group_ids)) + rows = conn.execute( + f"""SELECT roll_group_id, COALESCE(SUM(lots), 0) AS n + FROM roll_legs + WHERE roll_group_id IN ({placeholders}) AND status=? + GROUP BY roll_group_id""", + (*group_ids, LEG_STATUS_FILLED), + ).fetchall() + return {int(r["roll_group_id"]): int(r["n"] or 0) for r in rows} + + def _build_roll_context(conn) -> dict: + has_trend = bool(conn.execute( + "SELECT 1 FROM trend_pullback_plans WHERE status='active' LIMIT 1", + ).fetchone()) + groups_by_monitor: dict[int, dict] = {} + pending_monitors: set[int] = set() + for row in conn.execute( + "SELECT * FROM roll_groups WHERE status='active'", + ).fetchall(): + g = dict(row) + mid = int(g.get("order_monitor_id") or 0) + if mid: + groups_by_monitor[mid] = g + for row in conn.execute( + """SELECT g.order_monitor_id + FROM roll_legs l + JOIN roll_groups g ON g.id = l.roll_group_id + WHERE l.status=? AND g.status='active'""", + (LEG_STATUS_PENDING,), + ).fetchall(): + mid = int(row["order_monitor_id"] or 0) + if mid: + pending_monitors.add(mid) + return { + "has_trend": has_trend, + "groups_by_monitor": groups_by_monitor, + "pending_monitors": pending_monitors, + } + + def _roll_eligibility_with_ctx(conn, mon: dict, ctx: dict) -> Optional[str]: + mid = int(mon["id"]) + grp = ctx["groups_by_monitor"].get(mid) + legs_done = int(grp.get("leg_count") or 0) if grp else 0 + return roll_eligibility_error( + sizing_mode=get_sizing_mode(get_setting), + monitor=mon, + has_active_trend=ctx["has_trend"], + legs_done=legs_done, + has_pending_leg=mid in ctx["pending_monitors"], + ) + + def _enrich_roll_group_row_fast(row: dict, filled_map: dict[int, int]) -> dict: out = dict(row) lots = float(out.get("mon_lots") or 0) entry = float(out.get("mon_entry") or 0) @@ -2620,6 +2918,11 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se direction = (out.get("direction") or "long").strip().lower() sym = (out.get("symbol") or "").strip() mult = int(get_contract_spec(sym).get("mult") or 1) if sym else 1 + gid = int(out.get("id") or 0) + filled_add_lots = int(filled_map.get(gid) or 0) + out["add_lots_filled"] = filled_add_lots + out["first_lots"] = max(0, int(lots) - filled_add_lots) + out["total_lots"] = int(lots) out["avg_entry"] = round(entry, 4) if entry > 0 else None if lots > 0 and entry > 0 and tp > 0: if direction == "long": @@ -2630,6 +2933,134 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se out["reward_at_tp"] = None return out + def _enrich_roll_group_row(conn, row: dict) -> dict: + gid = int(row.get("id") or 0) + filled_map = _roll_filled_lots_map(conn, [gid]) if gid > 0 else {} + return _enrich_roll_group_row_fast(row, filled_map) + + def _archive_roll_group( + conn, + grp: dict, + *, + result_label: str = "持仓已结束", + ) -> None: + from zoneinfo import ZoneInfo + + now_s = datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S") + gid = int(grp.get("id") or 0) + if gid <= 0: + return + if conn.execute( + "SELECT 1 FROM strategy_trade_snapshots WHERE strategy_type=? AND source_id=? LIMIT 1", + (STRATEGY_ROLL, gid), + ).fetchone(): + conn.execute( + "UPDATE roll_groups SET status='closed', updated_at=? WHERE id=?", + (now_s, gid), + ) + return + legs = [ + dict(r) for r in conn.execute( + "SELECT * FROM roll_legs WHERE roll_group_id=? ORDER BY id", + (gid,), + ).fetchall() + ] + mon = None + mid = int(grp.get("order_monitor_id") or 0) + if mid: + row = conn.execute( + "SELECT * FROM trade_order_monitors WHERE id=?", + (mid,), + ).fetchone() + mon = dict(row) if row else None + payload = { + "group": dict(grp), + "legs": legs, + "monitor": mon, + } + save_snapshot( + conn, + strategy_type=STRATEGY_ROLL, + source_id=gid, + symbol=grp.get("symbol") or (mon or {}).get("symbol") or "", + direction=grp.get("direction") or (mon or {}).get("direction") or "", + result_label=result_label, + payload=payload, + opened_at=grp.get("created_at") or "", + ) + conn.execute( + "UPDATE roll_legs SET status=? WHERE roll_group_id=? AND status=?", + (LEG_STATUS_CANCELLED, gid, LEG_STATUS_PENDING), + ) + conn.execute( + "UPDATE roll_groups SET status='closed', updated_at=? WHERE id=?", + (now_s, gid), + ) + + def _close_stale_roll_groups(conn) -> int: + rows = conn.execute( + """SELECT g.*, m.status AS monitor_status + FROM roll_groups g + LEFT JOIN trade_order_monitors m ON m.id = g.order_monitor_id + WHERE g.status='active' + AND (m.id IS NULL OR m.status != 'active')""" + ).fetchall() + for r in rows: + _archive_roll_group(conn, dict(r), result_label="持仓已结束") + return len(rows) + + def _enrich_roll_leg_row(row: dict, mode: str) -> dict: + out = dict(row) + sym = (out.get("symbol") or "").strip() + mark = _cached_position_mark(sym, out.get("direction") or "") if sym else None + out["current_price"] = round(float(mark), 4) if mark and mark > 0 else None + return out + + def _enrich_roll_record_row(conn, row: dict) -> dict: + out = dict(row) + snap = out.get("snapshot") or {} + group = snap.get("group") or {} + legs = snap.get("legs") or [] + monitor = snap.get("monitor") or {} + filled_legs = [ + l for l in legs + if (l.get("status") or "").strip().lower() == LEG_STATUS_FILLED + ] + add_lots = sum(int(l.get("lots") or 0) for l in filled_legs) + total_lots = int((monitor or {}).get("lots") or 0) + first_lots = max(0, total_lots - add_lots) + latest_sl = ( + group.get("current_stop_loss") + or (monitor or {}).get("stop_loss") + or None + ) + close_log = None + try: + close_log = conn.execute( + """SELECT close_price, pnl, pnl_net, close_time, lots + FROM trade_logs + WHERE lower(symbol)=lower(?) AND direction=? + ORDER BY close_time DESC, id DESC LIMIT 1""", + (out.get("symbol") or "", out.get("direction") or ""), + ).fetchone() + except Exception: + close_log = None + close_d = dict(close_log) if close_log else {} + out["detail"] = { + "first_lots": first_lots if first_lots > 0 else None, + "add_count": len(filled_legs), + "add_lots": add_lots, + "total_lots": total_lots if total_lots > 0 else None, + "latest_stop_loss": latest_sl, + "close_price": close_d.get("close_price"), + "close_time": close_d.get("close_time") or out.get("closed_at"), + "pnl": close_d.get("pnl_net") if close_d.get("pnl_net") is not None else close_d.get("pnl"), + "legs": filled_legs, + "monitor": monitor, + "group": group, + } + return out + def _roll_leg_trigger_price(leg: dict): for key in ("breakthrough_price", "limit_price", "fill_price"): val = leg.get(key) @@ -2642,67 +3073,82 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se @_nav("strategy") def strategy_page(): conn = get_db() - init_strategy_tables(conn) - ensure_monitor_order_columns(conn) - capital = _capital(conn) - active_trend = conn.execute( - "SELECT * FROM trend_pullback_plans WHERE status='active' ORDER BY id DESC LIMIT 1" - ).fetchone() - monitors_raw = conn.execute( - "SELECT * FROM trade_order_monitors WHERE status='active' ORDER BY id DESC" - ).fetchall() - roll_groups = conn.execute( - """SELECT g.*, m.symbol_name, m.lots AS mon_lots, m.entry_price AS mon_entry, - m.take_profit AS mon_tp - FROM roll_groups g - LEFT JOIN trade_order_monitors m ON m.id = g.order_monitor_id - WHERE g.status='active' ORDER BY g.id DESC""" - ).fetchall() - roll_legs = conn.execute( - """SELECT l.*, g.symbol, g.direction, g.order_monitor_id - FROM roll_legs l - JOIN roll_groups g ON g.id = l.roll_group_id - ORDER BY l.id DESC LIMIT 30""" - ).fetchall() - sizing = get_sizing_mode(get_setting) - roll_allowed = sizing == MODE_AMOUNT - monitors = [] - for m in monitors_raw: - row = dict(m) - err = _roll_eligibility(conn, row) - row["roll_eligible"] = roll_allowed and err is None - if not roll_allowed: - row["roll_block_reason"] = "仅固定金额(以损定仓)模式可滚仓" - else: - row["roll_block_reason"] = err or "" - monitors.append(row) - active_trend_row = dict(active_trend) if active_trend else None - if active_trend_row: - active_trend_row["period_label"] = trend_period_label(active_trend_row.get("period") or "15m") - conn.close() - return render_template( - "strategy.html", - capital=capital, - fixed_amount=get_fixed_amount(get_setting), - sizing_mode=sizing, - sizing_mode_label=_sizing_mode_label(sizing), - roll_allowed=roll_allowed, - active_trend=active_trend_row, - monitors=monitors, - roll_groups=[_enrich_roll_group_row(dict(g)) for g in roll_groups], - roll_legs=[dict(l) for l in roll_legs], - trend_periods=trend_strategy_periods(), - add_mode_labels={ - "market": "市价加仓", - "breakout": "突破加仓", - }, - roll_leg_status_labels={ - "pending": "监控中", - "filled": "已成交", - "cancelled": "已删除", - "invalidated": "已失效", - }, - ) + try: + init_strategy_tables(conn) + ensure_monitor_order_columns(conn) + capital = _capital(conn) + active_trend = conn.execute( + "SELECT * FROM trend_pullback_plans WHERE status='active' ORDER BY id DESC LIMIT 1" + ).fetchone() + monitors_raw = conn.execute( + "SELECT * FROM trade_order_monitors WHERE status='active' ORDER BY id DESC" + ).fetchall() + mode = get_trading_mode(get_setting) + roll_ctx = _build_roll_context(conn) + roll_groups = conn.execute( + """SELECT g.*, m.symbol_name, m.lots AS mon_lots, m.entry_price AS mon_entry, + m.take_profit AS mon_tp + FROM roll_groups g + LEFT JOIN trade_order_monitors m ON m.id = g.order_monitor_id + WHERE g.status='active' ORDER BY g.id DESC""" + ).fetchall() + roll_legs = conn.execute( + """SELECT l.*, g.symbol, g.direction, g.order_monitor_id + FROM roll_legs l + JOIN roll_groups g ON g.id = l.roll_group_id + WHERE l.status=? AND g.status='active' + ORDER BY l.id DESC LIMIT 30""", + (LEG_STATUS_PENDING,), + ).fetchall() + sizing = get_sizing_mode(get_setting) + roll_allowed = sizing == MODE_AMOUNT + monitors = [] + for m in monitors_raw: + row = dict(m) + err = _roll_eligibility_with_ctx(conn, row, roll_ctx) + row["roll_eligible"] = roll_allowed and err is None + if not roll_allowed: + row["roll_block_reason"] = "仅固定金额(以损定仓)模式可滚仓" + else: + row["roll_block_reason"] = err or "" + monitors.append(row) + active_trend_row = dict(active_trend) if active_trend else None + if active_trend_row: + active_trend_row["period_label"] = trend_period_label( + active_trend_row.get("period") or "15m", + ) + group_ids = [int(g["id"]) for g in roll_groups if g["id"]] + filled_map = _roll_filled_lots_map(conn, group_ids) + enriched_groups = [ + _enrich_roll_group_row_fast(dict(g), filled_map) for g in roll_groups + ] + enriched_legs = [_enrich_roll_leg_row(dict(l), mode) for l in roll_legs] + return render_template( + "strategy.html", + capital=capital, + fixed_amount=get_fixed_amount(get_setting), + sizing_mode=sizing, + sizing_mode_label=_sizing_mode_label(sizing), + roll_allowed=roll_allowed, + active_trend=active_trend_row, + monitors=monitors, + roll_groups=enriched_groups, + roll_legs=enriched_legs, + trading_session=is_trading_session(), + session_clock=trading_session_clock(), + trend_periods=trend_strategy_periods(), + add_mode_labels={ + "market": "市价加仓", + "breakout": "突破加仓", + }, + roll_leg_status_labels={ + "pending": "监控中", + "filled": "已成交", + "cancelled": "已取消", + }, + ) + finally: + conn.close() @app.route("/strategy/records") @login_required @@ -2710,6 +3156,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se conn = get_db() init_strategy_tables(conn) trend, roll = list_snapshots(conn) + roll = [_enrich_roll_record_row(conn, r) for r in roll] conn.close() return render_template("strategy_records.html", trend_rows=trend, roll_rows=roll) @@ -3079,6 +3526,18 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se mode = get_trading_mode(get_setting) body = request.get_json(silent=True) or {} force = bool(body.get("force")) + auto = bool(body.get("auto")) + # 自动连接仅由 qihuo-ctp 后台 worker 发起;Web 只读状态,避免换页重复 connect。 + if auto and not force: + st = ctp_status(mode) + acc = _ctp_account(mode) if st.get("connected") else {} + return jsonify({ + "ok": True, + "connecting": bool(st.get("connecting")), + "backend_managed": True, + "status": st, + "account": acc, + }) info = ctp_start_connect(mode, force=force) st = info.get("status") or ctp_status(mode) acc = _ctp_account(mode) if st.get("connected") else {} @@ -3136,6 +3595,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se conn.commit() ctp_acc = _ctp_account(mode) if ctp_st.get("connected") else {} positions = _ctp_positions(mode) if ctp_st.get("connected") else [] + if ctp_st.get("connected") and not positions: + positions = _positions_for_monitor_restore(mode) return jsonify({ "capital": capital, "trading_mode": mode, @@ -3344,20 +3805,70 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se (int(grp["id"]), LEG_STATUS_PENDING), ).fetchone()) - def _roll_eligibility(conn, mon: dict) -> Optional[str]: - has_trend = bool(conn.execute( - "SELECT 1 FROM trend_pullback_plans WHERE status='active' LIMIT 1", - ).fetchone()) - return roll_eligibility_error( - sizing_mode=get_sizing_mode(get_setting), - monitor=mon, - has_active_trend=has_trend, - legs_done=_roll_filled_legs(conn, int(mon["id"])), - has_pending_leg=_roll_has_pending(conn, int(mon["id"])), - ) + def _roll_eligibility(conn, mon: dict, ctx: Optional[dict] = None) -> Optional[str]: + if ctx is None: + ctx = _build_roll_context(conn) + return _roll_eligibility_with_ctx(conn, mon, ctx) - def _roll_mark_price(sym: str, mon: dict, mode: str) -> float: - mark = ctp_get_tick_price(mode, sym) if ctp_status(mode).get("connected") else None + def _roll_monitor_for_request(conn, mon_id: int) -> Optional[dict]: + row = conn.execute( + "SELECT * FROM trade_order_monitors WHERE id=?", + (int(mon_id),), + ).fetchone() + if not row: + return None + mon = dict(row) + if (mon.get("status") or "").strip().lower() == "active": + return mon + mode = get_trading_mode(get_setting) + if not _cached_ctp_status(mode).get("connected"): + return None + sym = (mon.get("symbol") or "").strip() + direction = (mon.get("direction") or "long").strip().lower() + for p in _positions_for_monitor_restore(mode, allow_ctp=False): + if int(p.get("lots") or 0) <= 0: + continue + if (p.get("direction") or "long").strip().lower() != direction: + continue + if not _match_ctp_symbol(p.get("symbol") or "", sym): + continue + execute_retry( + conn, + "UPDATE trade_order_monitors SET status='active' WHERE id=?", + (int(mon_id),), + ) + mon["status"] = "active" + _sync_monitor_from_ctp( + conn, + int(mon_id), + sym, + direction, + mode, + ctp=p, + capital=_capital(conn), + ) + fresh = conn.execute( + "SELECT * FROM trade_order_monitors WHERE id=?", + (int(mon_id),), + ).fetchone() + return dict(fresh) if fresh else mon + return None + + def _roll_mark_price( + sym: str, + mon: dict, + mode: str, + *, + allow_ctp: bool = False, + ) -> float: + mark = _cached_position_mark(sym, (mon or {}).get("direction") or "") + if mark and mark > 0: + return float(mark) + mark = ( + ctp_get_tick_price(mode, sym) + if allow_ctp and ctp_status(mode).get("connected") + else None + ) if mark and mark > 0: return float(mark) px = fetch_price(sym) @@ -3369,11 +3880,16 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se sym = mon["symbol"] spec = get_contract_spec(sym) capital = _capital(conn) - mark = _roll_mark_price(sym, mon, mode) + add_mode = (d.get("add_mode") or ADD_MODE_MARKET).strip().lower() + off_session_breakout = add_mode == ADD_MODE_BREAKOUT and not is_trading_session() + mark = _roll_mark_price(sym, mon, mode, allow_ctp=not off_session_breakout) + if (not mark or mark <= 0) and off_session_breakout: + bt = float(d.get("breakthrough_price") or 0) + mark = bt if bt > 0 else float(mon.get("entry_price") or 0) entry_existing = _live_entry_price( sym, mon["direction"], mode, float(mon.get("entry_price") or 0), + allow_ctp=False, ) - add_mode = (d.get("add_mode") or ADD_MODE_MARKET).strip().lower() if add_mode in FIB_MODES: return None, "斐波加仓已停用,请选市价或突破" if add_mode not in _roll_ui_modes(): @@ -3397,6 +3913,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se fib_upper=d.get("fib_upper"), fib_lower=d.get("fib_lower"), legs_done=legs_done, + off_session_pending=off_session_breakout, ) if err: return None, err @@ -3593,11 +4110,13 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se try: check_roll_monitors( conn, - get_mark_price_fn=lambda sym: _roll_mark_price(sym, {}, mode), + get_mark_price_fn=lambda sym: _roll_mark_price(sym, {}, mode, allow_ctp=True), fill_roll_leg_fn=_fill_roll_leg_cb, is_trading_session_fn=is_trading_session, get_risk_budget_fn=lambda: get_fixed_amount(get_setting), - get_entry_price_fn=lambda sym, d, fb: _live_entry_price(sym, d, mode, fb), + get_entry_price_fn=lambda sym, d, fb: _live_entry_price( + sym, d, mode, fb, allow_ctp=True, + ), ) conn.commit() finally: @@ -3622,11 +4141,12 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se qty_existing = float(mon.get("lots") or 0) entry_existing = _live_entry_price( sym, direction, mode, float(mon.get("entry_price") or 0), + allow_ctp=False, ) mult = int(get_contract_spec(sym).get("mult") or 1) roll_pct = get_roll_max_margin_pct(get_setting) add_lots = int(preview.get("add_lots") or 0) - positions = _ctp_positions(mode, refresh_if_empty=False) + positions = _positions_for_monitor_restore(mode, allow_ctp=False) capped, usage = cap_lots_for_margin_budget( positions, capital, sym, direction, price, add_lots, roll_pct, trading_mode=mode, ) @@ -3666,14 +4186,14 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se init_strategy_tables(conn) ensure_monitor_order_columns(conn) mon_id = int(d.get("monitor_id") or 0) - mon = conn.execute( - "SELECT * FROM trade_order_monitors WHERE id=? AND status='active'", (mon_id,), - ).fetchone() + roll_ctx = _build_roll_context(conn) + mon = _roll_monitor_for_request(conn, mon_id) if not mon: conn.close() return jsonify({"ok": False, "error": "无有效持仓监控"}), 400 + conn.commit() mon_d = dict(mon) - err = _roll_eligibility(conn, mon_d) + err = _roll_eligibility(conn, mon_d, roll_ctx) if err: conn.close() return jsonify({"ok": False, "error": err}), 400 @@ -3692,14 +4212,14 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se init_strategy_tables(conn) ensure_monitor_order_columns(conn) mon_id = int(d.get("monitor_id") or 0) - mon = conn.execute( - "SELECT * FROM trade_order_monitors WHERE id=? AND status='active'", (mon_id,), - ).fetchone() + roll_ctx = _build_roll_context(conn) + mon = _roll_monitor_for_request(conn, mon_id) if not mon: conn.close() return jsonify({"ok": False, "error": "无有效持仓监控"}), 400 + conn.commit() mon_d = dict(mon) - err = _roll_eligibility(conn, mon_d) + err = _roll_eligibility(conn, mon_d, roll_ctx) if err: conn.close() return jsonify({"ok": False, "error": err}), 400 @@ -3714,11 +4234,12 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se conn.close() if not ok: return jsonify({"ok": False, "error": msg}), 400 - return jsonify({"ok": True, "message": msg, "pending": True}) + note = "已提交监控,开盘触价后自动市价加仓" if not is_trading_session() else msg + return jsonify({"ok": True, "message": note, "pending": True}) if not is_trading_session(): conn.close() return jsonify({"ok": False, "error": "不在交易时间段"}), 403 - if not ctp_status(mode).get("connected"): + if not _cached_ctp_status(mode).get("connected"): conn.close() return jsonify({"ok": False, "error": "请先连接 CTP"}), 400 ok, msg = _commit_roll_fill( @@ -4035,6 +4556,11 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se mode = get_trading_mode(get_setting) connected = bool(ctp_status(mode).get("connected")) now = _time.time() + since_connect = now - float( + getattr(get_bridge(), "_last_connect_ok_ts", 0) or 0, + ) + if connected and since_connect < 45: + return _refresh_trading_live_snapshot(fast=True) need_full = ( connected and ( @@ -4044,42 +4570,41 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se ) if need_full: _last_full_calibrate["ts"] = now - payload = _refresh_trading_live_snapshot(fast=False) - else: - payload = _refresh_trading_live_snapshot(fast=True) - return payload + return _refresh_trading_live_snapshot(fast=False) + return _refresh_trading_live_snapshot(fast=True) start_position_worker( refresh_fn=_position_worker_refresh, interval=1, idle_interval=3, ) - _bootstrap_trading_runtime() - start_ctp_reconnect_worker( - get_mode_fn=lambda: get_trading_mode(get_setting), - get_setting_fn=get_setting, - ) - start_ctp_premarket_connect_worker( - get_mode_fn=lambda: get_trading_mode(get_setting), - get_setting_fn=get_setting, - ) - start_sl_tp_guard_worker( - db_path=DB_PATH, - get_mode_fn=lambda: get_trading_mode(get_setting), - init_tables_fn=_init_tables, - get_capital_fn=_capital, - get_be_tick_buffer_fn=lambda: get_trailing_be_tick_buffer(get_setting), - notify_fn=send_wechat_msg, - interval=1, - ) - start_pending_order_worker( - db_path=DB_PATH, - get_mode_fn=lambda: get_trading_mode(get_setting), - init_tables_fn=_init_tables, - get_capital_fn=_capital, - reconcile_fn=_reconcile_pending, - on_changed_fn=lambda: _push_position_snapshot_async(fast=False), - ) + if os.getenv("QIHUO_CTP_ROLE", "client").strip().lower() == "worker": + _bootstrap_trading_runtime() + start_ctp_reconnect_worker( + get_mode_fn=lambda: get_trading_mode(get_setting), + get_setting_fn=get_setting, + ) + start_ctp_premarket_connect_worker( + get_mode_fn=lambda: get_trading_mode(get_setting), + get_setting_fn=get_setting, + ) + start_sl_tp_guard_worker( + db_path=DB_PATH, + get_mode_fn=lambda: get_trading_mode(get_setting), + init_tables_fn=_init_tables, + get_capital_fn=_capital, + get_be_tick_buffer_fn=lambda: get_trailing_be_tick_buffer(get_setting), + notify_fn=send_wechat_msg, + interval=1, + ) + start_pending_order_worker( + db_path=DB_PATH, + get_mode_fn=lambda: get_trading_mode(get_setting), + init_tables_fn=_init_tables, + get_capital_fn=_capital, + reconcile_fn=_reconcile_pending, + on_changed_fn=lambda: _push_position_snapshot_async(fast=False), + ) def _start_deferred_workers() -> None: time.sleep(2) @@ -4093,11 +4618,12 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se get_sizing_mode_fn=lambda: get_sizing_mode(get_setting), get_fixed_lots_fn=lambda: get_fixed_lots(get_setting), ) - start_ctp_fee_worker( - get_mode_fn=lambda: get_trading_mode(get_setting), - get_setting_fn=get_setting, - set_setting_fn=set_setting, - ) + if os.getenv("QIHUO_CTP_ROLE", "client").strip().lower() == "worker": + start_ctp_fee_worker( + get_mode_fn=lambda: get_trading_mode(get_setting), + get_setting_fn=get_setting, + set_setting_fn=set_setting, + ) from ai_worker import start_ai_worker start_ai_worker( diff --git a/market_sessions.py b/market_sessions.py index 22c1626..934fe9f 100644 --- a/market_sessions.py +++ b/market_sessions.py @@ -277,9 +277,11 @@ def should_keep_ctp_connected( minutes_before: int = 30, minutes_after: int = 30, ) -> bool: - """是否处于应连接 CTP 的窗口:交易时段 + 盘前 + 盘后宽限。""" + """是否处于应连接 CTP 的窗口:交易时段 + 小节/午间休盘 + 盘前 + 盘后宽限。""" if is_trading_session(now): return True + if is_morning_break(now) or is_lunch_break(now): + return True if in_postmarket_grace_window(now, minutes_after=minutes_after): return True return in_premarket_connect_window(now, minutes_before=minutes_before) diff --git a/scripts/_check_ctp_worker_health.py b/scripts/_check_ctp_worker_health.py new file mode 100644 index 0000000..a392b21 --- /dev/null +++ b/scripts/_check_ctp_worker_health.py @@ -0,0 +1,46 @@ +"""Check qihuo web CTP status and qihuo-ctp worker health.""" +from __future__ import annotations + +import sys + +import paramiko + +sys.stdout.reconfigure(encoding="utf-8", errors="replace") + +c = paramiko.SSHClient() +c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) +c.connect("192.168.8.21", username="root", password="woaini88", timeout=15) +cmds = [ + 'curl -s --max-time 8 -H "X-Qihuo-CTP-Token: qihuo-local-ctp" http://127.0.0.1:6601/health', + r'''python3 - <<'PY' +import http.cookiejar, json, sqlite3, urllib.parse, urllib.request +conn = sqlite3.connect("/opt/qihuo/futures.db") +row = conn.execute("SELECT value FROM settings WHERE key='admin_username'").fetchone() +conn.close() +user = row[0] if row else "admin" +pw = "" +for line in open("/opt/qihuo/.env", encoding="utf-8-sig", errors="replace"): + if line.startswith("ADMIN_PASSWORD="): + pw = line.split("=", 1)[1].strip().strip('"').strip("'") +jar = http.cookiejar.CookieJar() +op = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(jar)) +op.open(urllib.request.Request( + "http://127.0.0.1:6600/login", + urllib.parse.urlencode({"username": user, "password": pw}).encode(), +), timeout=8) +raw = op.open("http://127.0.0.1:6600/api/ctp/status", timeout=8).read() +print(raw.decode("utf-8", "replace")[:2000]) +PY''', + "pm2 jlist | python3 -c \"import sys,json; rows=json.load(sys.stdin); print([(r.get('name'), (r.get('pm2_env') or {}).get('status'), (r.get('pm2_env') or {}).get('restart_time')) for r in rows if r.get('name') in ('qihuo','qihuo-ctp')])\"", +] +try: + for cmd in cmds: + print("===", cmd) + _, o, e = c.exec_command(cmd, timeout=30) + out = o.read().decode("utf-8", "replace") + err = e.read().decode("utf-8", "replace") + print(out[:3000]) + if err.strip(): + print("ERR:", err[:1000]) +finally: + c.close() diff --git a/scripts/_verify_ctp_isolation.py b/scripts/_verify_ctp_isolation.py new file mode 100644 index 0000000..237f57d --- /dev/null +++ b/scripts/_verify_ctp_isolation.py @@ -0,0 +1,112 @@ +"""Verify qihuo web process survives an isolated qihuo-ctp restart.""" +from __future__ import annotations + +import json +import sys +import time + +import paramiko + +sys.stdout.reconfigure(encoding="utf-8", errors="replace") + + +def _connect() -> paramiko.SSHClient: + c = paramiko.SSHClient() + c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + c.connect("192.168.8.21", username="root", password="woaini88", timeout=15) + return c + + +def _run(c: paramiko.SSHClient, cmd: str, timeout: int = 60) -> str: + _, o, e = c.exec_command(cmd, timeout=timeout) + out = o.read().decode("utf-8", "replace") + err = e.read().decode("utf-8", "replace") + return out + (("\nERR:\n" + err) if err.strip() else "") + + +def _pm2(c: paramiko.SSHClient) -> dict[str, dict]: + raw = _run(c, "pm2 jlist", timeout=30) + rows = json.loads(raw) + return {r.get("name"): r for r in rows} + + +def _restart_count(row: dict) -> int: + env = row.get("pm2_env") or {} + return int(env.get("restart_time") or 0) + + +def main() -> int: + c = _connect() + try: + before = _pm2(c) + for name in ("qihuo", "qihuo-ctp"): + row = before.get(name) or {} + env = row.get("pm2_env") or {} + print( + "before", + name, + "status", + env.get("status"), + "restarts", + _restart_count(row), + "pid", + row.get("pid"), + ) + + health = _run( + c, + 'curl -s -H "X-Qihuo-CTP-Token: qihuo-local-ctp" ' + "http://127.0.0.1:6601/health", + timeout=30, + ) + print("worker_health", health[:1000]) + web = _run( + c, + "curl -s -o /dev/null -w '%{http_code}' http://127.0.0.1:6600/login", + timeout=30, + ) + print("web_login_before", web.strip()) + + print("restarting qihuo-ctp only") + print(_run(c, "pm2 restart qihuo-ctp --update-env", timeout=60)) + time.sleep(8) + + after = _pm2(c) + for name in ("qihuo", "qihuo-ctp"): + row = after.get(name) or {} + env = row.get("pm2_env") or {} + print( + "after", + name, + "status", + env.get("status"), + "restarts", + _restart_count(row), + "pid", + row.get("pid"), + ) + + web_after = _run( + c, + "curl -s -o /dev/null -w '%{http_code}' http://127.0.0.1:6600/login", + timeout=30, + ) + print("web_login_after", web_after.strip()) + + qihuo_before = _restart_count(before.get("qihuo") or {}) + qihuo_after = _restart_count(after.get("qihuo") or {}) + ctp_before = _restart_count(before.get("qihuo-ctp") or {}) + ctp_after = _restart_count(after.get("qihuo-ctp") or {}) + ok = ( + qihuo_after == qihuo_before + and ctp_after >= ctp_before + 1 + and web_after.strip() == "200" + ) + print("isolation_ok", ok) + return 0 if ok else 1 + finally: + c.close() + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/deploy_pos_display_fix.py b/scripts/deploy_pos_display_fix.py new file mode 100644 index 0000000..b7082fc --- /dev/null +++ b/scripts/deploy_pos_display_fix.py @@ -0,0 +1,72 @@ +"""Deploy position display fix: stop ALTER lock + live rows from CTP.""" +import sys +import time +from pathlib import Path + +import paramiko + +sys.stdout.reconfigure(encoding="utf-8", errors="replace") +root = Path(__file__).resolve().parents[1] +files = [ + "ctp_ipc_client.py", + "ctp_worker.py", + "ecosystem.config.cjs", + "market_sessions.py", + "trading_context.py", + "dashboard_lib.py", + "sl_tp_guard.py", + "install_trading.py", + "vnpy_bridge.py", + "static/js/trade.js", + "templates/strategy.html", + "templates/strategy_records.html", + "static/js/strategy.js", + "strategy/strategy_roll_lib.py", + "scripts/run_schema_migrate.py", +] + +c = paramiko.SSHClient() +c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) +c.connect("192.168.8.21", username="root", password="woaini88", timeout=15) +sftp = c.open_sftp() +for rel in files: + sftp.put(str(root / rel), f"/opt/qihuo/{rel}") + print("uploaded", rel) +sftp.close() + +cmds = [ + "cd /opt/qihuo && source venv/bin/activate && python3 scripts/run_schema_migrate.py", + "cd /opt/qihuo && source venv/bin/activate && python3 -m py_compile ctp_ipc_client.py ctp_worker.py vnpy_bridge.py sl_tp_guard.py install_trading.py", + "cd /opt/qihuo && (pm2 describe qihuo-ctp >/dev/null && pm2 restart qihuo-ctp --update-env || pm2 start ecosystem.config.cjs --only qihuo-ctp)", + "cd /opt/qihuo && (pm2 describe qihuo >/dev/null && pm2 restart qihuo --update-env || pm2 start ecosystem.config.cjs --only qihuo)", + "pm2 save", +] +for cmd in cmds: + print(">>>", cmd) + _, o, e = c.exec_command(cmd, timeout=120) + out = o.read().decode("utf-8", "replace") + err = e.read().decode("utf-8", "replace") + if out.strip(): + print(out) + if err.strip(): + print(err) + +time.sleep(25) +_, o, _ = c.exec_command( + "curl -s -o /dev/null -w 'login:%{http_code}\\n' --max-time 10 http://127.0.0.1:6600/login", + timeout=30, +) +print(o.read().decode()) + +# verify live vs account_snapshot +poll = root / "scripts" / "_poll_loop.py" +if poll.exists(): + import subprocess + + r = subprocess.run([sys.executable, str(poll)], capture_output=True, text=True, timeout=120) + print(r.stdout) + if r.stderr.strip(): + print(r.stderr) + +c.close() +print("done") diff --git a/sl_tp_guard.py b/sl_tp_guard.py index 52e2c6c..972d7e7 100644 --- a/sl_tp_guard.py +++ b/sl_tp_guard.py @@ -60,13 +60,47 @@ MONITOR_ORDER_COLUMNS = ( TRADE_RESULTS = ("止损", "止盈", "移动止盈", "保本止盈", "手动平仓") +_MONITOR_COLUMNS_READY = False +_MONITOR_COLUMNS_LOCK = threading.Lock() -def ensure_monitor_order_columns(conn) -> None: - for sql in MONITOR_ORDER_COLUMNS: - try: - conn.execute(sql) - except Exception: - pass + +def _monitor_columns_exist(conn) -> bool: + try: + rows = conn.execute("PRAGMA table_info(trade_order_monitors)").fetchall() + cols = set() + for r in rows: + if isinstance(r, dict): + cols.add(r.get("name") or "") + else: + cols.add(r[1]) + return "open_fee" in cols + except Exception: + return False + + +def ensure_monitor_order_columns(conn, *, migrate: bool = False) -> None: + """列齐全后不再 ALTER,避免 worker 每次请求锁 SQLite。""" + global _MONITOR_COLUMNS_READY + if _MONITOR_COLUMNS_READY: + return + with _MONITOR_COLUMNS_LOCK: + if _MONITOR_COLUMNS_READY: + return + if _monitor_columns_exist(conn): + _MONITOR_COLUMNS_READY = True + return + if not migrate: + return + for sql in MONITOR_ORDER_COLUMNS: + try: + conn.execute(sql) + conn.commit() + except Exception: + try: + conn.rollback() + except Exception: + pass + _MONITOR_COLUMNS_READY = True def _tick_size(ths_code: str) -> float: diff --git a/static/js/strategy.js b/static/js/strategy.js index d79a67d..f9a9997 100644 --- a/static/js/strategy.js +++ b/static/js/strategy.js @@ -109,6 +109,7 @@ var breakEl = document.getElementById('roll-break-price'); var execHint = document.getElementById('roll-exec-hint'); var btnExec = document.getElementById('btn-roll-exec'); + var btnPreview = document.getElementById('btn-roll-preview'); if (!modeEl) return; var mode = modeEl.value || 'market'; var isBreak = mode === 'breakout'; @@ -120,6 +121,12 @@ if (btnExec) { btnExec.textContent = mode === 'market' ? '执行滚仓' : '提交监控'; } + if (btnPreview) { + btnPreview.disabled = !inTradingSession && !isBreak; + btnPreview.title = (!inTradingSession && !isBreak) + ? '休盘期间请切换为突破加仓' + : ''; + } } function syncRollRiskHint() { @@ -196,6 +203,7 @@ var rollPayload = null; var rollMonitorSel = document.getElementById('roll-monitor-select'); var rollModeSel = document.getElementById('roll-add-mode'); + var inTradingSession = {{ 'true' if trading_session else 'false' }}; if (rollModeSel) rollModeSel.addEventListener('change', syncRollModeUi); if (rollMonitorSel) rollMonitorSel.addEventListener('change', syncRollRiskHint); @@ -214,6 +222,7 @@ } showPreview(rollPrev, formatRoll(d.preview), true, false); btnRollE.hidden = false; + syncRollModeUi(); }).finally(function () { btnRollP.disabled = false; }); @@ -224,6 +233,10 @@ var payload = rollPayload || formData(rollForm); var mode = (payload.add_mode || 'market'); if (mode === 'market') { + if (!inTradingSession) { + alert('休盘期间请切换为「突破加仓」后提交监控'); + return; + } if (!confirm('确认执行市价滚仓?')) return; startRollCountdown(btnRollE, payload); return; diff --git a/static/js/trade.js b/static/js/trade.js index ccd7184..cb79583 100644 --- a/static/js/trade.js +++ b/static/js/trade.js @@ -34,6 +34,8 @@ var ctpConnecting = false; var ctpAutoConnectEnabled = true; var positionsRendered = false; + var posFastPollTimer = null; + var posFastPollCount = 0; var lastPosRowCount = 0; var selectedMaxLots = null; var recommendMaxByProduct = {}; @@ -248,6 +250,24 @@ }); } + function stopPosFastPoll() { + if (posFastPollTimer) { + clearInterval(posFastPollTimer); + posFastPollTimer = null; + } + posFastPollCount = 0; + } + + function startPosFastPoll() { + if (posFastPollTimer) return; + posFastPollCount = 0; + posFastPollTimer = setInterval(function () { + pollPositions(); + posFastPollCount += 1; + if (posFastPollCount >= 90) stopPosFastPoll(); + }, 1000); + } + function applyPositionsData(data) { if (!data) return; var cap = document.getElementById('cap-display'); @@ -312,6 +332,7 @@ if (!connected) { if (connecting) { list.innerHTML = '
CTP 连接中,请稍候…
'; + startPosFastPoll(); return; } if (cooldownSec > 0 || (data.ctp_status && data.ctp_status.last_error)) { @@ -325,8 +346,8 @@ list.innerHTML = '
' + offHint + '
'; return; } - list.innerHTML = '
CTP 未连接,正在尝试自动重连…
'; - if (ctpAutoConnectEnabled) tryAutoCtpReconnect(); + list.innerHTML = '
CTP 未连接,后台自动连接中…
'; + if (ctpAutoConnectEnabled) refreshCtpStatusPassive(); return; } var syncing = data.sync_state === 'syncing'; @@ -339,6 +360,7 @@ syncBadge.textContent = data.sync_label || '持仓同步中…'; syncBadge.className = 'sync-badge text-accent'; } + startPosFastPoll(); return; } list.innerHTML = '
暂无持仓。
'; @@ -347,9 +369,7 @@ return; } lastPosRowCount = rows.length; - if (!connected && ctpAutoConnectEnabled) { - tryAutoCtpReconnect(); - } + stopPosFastPoll(); list.innerHTML = rows.map(buildPosCard).join(''); syncPositionListScroll(rows.length); bindPendingDismiss(list); @@ -610,8 +630,9 @@ } if (st.connecting && Date.now() < deadline) { syncCtpBadgeFromStatus(st); + pollPositions(); return new Promise(function (resolve) { - setTimeout(function () { resolve(tick()); }, 2000); + setTimeout(function () { resolve(tick()); }, 800); }); } syncCtpBadgeFromStatus(st); @@ -795,18 +816,31 @@ }); } - function tryAutoCtpReconnect() { - if (!ctpAutoConnectEnabled) return; - if (ctpReconnecting || ctpConnectInflight) return; + /** 只读 CTP 状态;连接由 qihuo-ctp 后台 worker 负责,前端不发起 connect。 */ + function refreshCtpStatusPassive() { + if (ctpConnected || ctpConnecting) return; var now = Date.now(); - if (now - lastCtpReconnectAt < 60000) return; - if (lastCtpLoginBanAt && now - lastCtpLoginBanAt < 2700000) return; - if (lastCtpUnreachableAt && now - lastCtpUnreachableAt < 300000) return; + if (now - lastCtpReconnectAt < 8000) return; lastCtpReconnectAt = now; - ctpReconnecting = true; - requestCtpConnect(false).finally(function () { - ctpReconnecting = false; - }); + fetch('/api/ctp/status') + .then(function (r) { return r.json(); }) + .then(function (d) { + var st = d.status || {}; + syncCtpBadgeFromStatus(st); + if (st.connected) { + showCtpError(''); + pollPositions(); + startPosFastPoll(); + } else if (st.connecting) { + updateCtpBadge(false, true); + startPosFastPoll(); + } else if (st.last_error) { + showCtpError(st.last_error); + } else if (st.disabled_hint) { + showCtpError(st.disabled_hint); + } + }) + .catch(function () {}); } function showOrderMsg(text, ok) { @@ -1911,12 +1945,22 @@ } else if (st.last_error) { showCtpError(st.last_error); } - if (st.connected) pollPositions(); + if (st.connected) { + pollPositions(); + startPosFastPoll(); + } else if (st.connecting) { + startPosFastPoll(); + waitForCtpConnected(90000); + } else if (ctpAutoConnectEnabled && !(st.login_cooldown_sec > 0)) { + refreshCtpStatusPassive(); + startPosFastPoll(); + } }) .catch(function () {}); } function cleanupTradePage() { + stopPosFastPoll(); if (sessionClockTickTimer) { clearInterval(sessionClockTickTimer); sessionClockTickTimer = null; diff --git a/strategy/strategy_roll_lib.py b/strategy/strategy_roll_lib.py index fc2866c..c153752 100644 --- a/strategy/strategy_roll_lib.py +++ b/strategy/strategy_roll_lib.py @@ -170,6 +170,7 @@ def validate_roll_geometry( limit_price: Optional[float] = None, breakthrough_price: Optional[float] = None, at_trigger: bool = False, + off_session_pending: bool = False, ) -> Optional[str]: """几何校验。 @@ -206,6 +207,12 @@ def validate_roll_geometry( trigger = float(breakthrough_price or 0) if trigger <= 0: return "须填写突破价" + if off_session_pending: + if direction == "long" and not (sl < trigger): + return "做多突破:休盘提交须满足 止损 < 突破价" + if direction == "short" and not (trigger < sl): + return "做空突破:休盘提交须满足 突破价 < 止损" + return None if at_trigger: if direction == "long": if not (sl < trigger <= mark): @@ -269,12 +276,15 @@ def preview_roll( fib_lower: Optional[float] = None, legs_done: int = 0, at_trigger: bool = False, + off_session_pending: bool = False, ) -> Tuple[Optional[dict[str, Any]], Optional[str]]: direction = (direction or "long").strip().lower() if legs_done >= max_roll_legs(direction): return None, f"滚仓已达 {max_roll_legs(direction)} 次上限" mode = (add_mode or ADD_MODE_MARKET).strip().lower() mark = float(mark_price or add_price or 0) + if mark <= 0 and mode == ADD_MODE_BREAKOUT and off_session_pending: + mark = float(breakthrough_price or 0) if mark <= 0: return None, "需要有效参考价" sl = float(new_stop_loss) @@ -314,6 +324,7 @@ def preview_roll( limit_price=trigger_price if mode in FIB_MODES else None, breakthrough_price=trigger_price if mode == ADD_MODE_BREAKOUT else None, at_trigger=at_trigger, + off_session_pending=off_session_pending and is_pending, ) if geom_err: return None, geom_err diff --git a/templates/strategy.html b/templates/strategy.html index d5ff2c0..6211a4b 100644 --- a/templates/strategy.html +++ b/templates/strategy.html @@ -136,7 +136,10 @@ - + + {% if not trading_session %} +

当前{{ session_clock.status_label or '休盘' }}:请选「突破加仓」填写突破价后预览并提交监控。

+ {% endif %} {% else %}

暂无可用持仓监控

@@ -151,7 +154,7 @@
- + {% for g in roll_groups %} @@ -160,6 +163,8 @@ + + @@ -172,12 +177,12 @@ {% else %}

暂无

{% endif %} -

最近滚仓腿

+

正在滚仓

{% if roll_legs %}
ID品种方向腿数首仓TP当前SL当前均价止盈盈利(元)ID品种方向腿数首仓手数当前总手数首仓TP当前SL当前均价止盈盈利(元)
{{ g.symbol_name or g.symbol }} {{ '多' if g.direction == 'long' else '空' }} {{ g.leg_count or 0 }}/3{{ g.first_lots if g.first_lots is not none else '—' }}{{ g.total_lots if g.total_lots is not none else '—' }} {{ g.initial_take_profit or '—' }} {{ g.current_stop_loss or '—' }} {{ g.avg_entry or '—' }}
- + {% for l in roll_legs %} @@ -188,6 +193,7 @@ + diff --git a/templates/strategy_records.html b/templates/strategy_records.html index 78f2d2a..086572a 100644 --- a/templates/strategy_records.html +++ b/templates/strategy_records.html @@ -1,6 +1,15 @@ {# Copyright (c) 2025-2026 马建军. All rights reserved. 专有软件,详见 LICENSE.zh-CN.txt #} {% extends "base.html" %} {% block title %}策略记录 - 国内期货 · 交易复盘系统{% endblock %} +{% block extra_css %} + +{% endblock %} {% block content %}
@@ -15,7 +24,33 @@

顺势加仓

{% if roll_rows %}
    {% for r in roll_rows %} -
  • {{ r.symbol }} {{ r.result_label }} · {{ r.closed_at or r.created_at }}
  • +
  • +
    + {{ r.symbol }} {{ r.result_label }} · {{ r.closed_at or r.created_at }} +
    +

    方向:{{ '多' if r.direction == 'long' else '空' }}

    +

    首仓手数:{{ r.detail.first_lots or '—' }} · 加仓次数:{{ r.detail.add_count or 0 }} · 加仓手数:{{ r.detail.add_lots or 0 }} · 当前总手数:{{ r.detail.total_lots or '—' }}

    +

    最新止损:{{ r.detail.latest_stop_loss or '—' }} · 平仓价格:{{ r.detail.close_price or '—' }} · 盈利情况:{{ r.detail.pnl if r.detail.pnl is not none else '—' }}

    + {% if r.detail.legs %} +
#方式手数触发/限价新SL状态操作#方式手数触发/限价新SL当前价状态操作
{{ l.lots or '—' }} {{ l.breakthrough_price or l.limit_price or l.fill_price or '—' }} {{ l.new_stop_loss or '—' }}{{ l.current_price if l.current_price is not none else '—' }} {{ roll_leg_status_labels.get(l.status, l.status) }}{% if l.status == 'invalidated' and l.invalidated_reason %} · {{ l.invalidated_reason[:24] }}{% endif %} {% if l.status == 'pending' %}{% else %}—{% endif %}
+ + + {% for l in r.detail.legs %} + + + + + + + + + {% endfor %} + +
方式手数成交/触发价新SL时间
{{ l.leg_index or loop.index }}{{ l.add_mode }}{{ l.lots or '—' }}{{ l.fill_price or l.breakthrough_price or l.limit_price or '—' }}{{ l.new_stop_loss or '—' }}{{ l.created_at or '—' }}
+ {% endif %} +
+ + {% endfor %} {% else %}

暂无记录

{% endif %} diff --git a/trading_context.py b/trading_context.py index 700b067..169a6ba 100644 --- a/trading_context.py +++ b/trading_context.py @@ -79,8 +79,47 @@ def get_pending_order_timeout_sec(get_setting: Callable[[str, str], str]) -> int return get_pending_order_timeout_min(get_setting) * 60 +def _cached_ctp_account(mode: str) -> dict[str, float]: + """CTP 未连接时,用最近一次 worker/持仓快照里的账户权益。""" + import json + + try: + from position_stream import position_hub + + snap = position_hub.get_snapshot() or {} + cap = float(snap.get("capital") or 0) + if cap > 0: + return {"balance": cap} + except Exception: + pass + try: + from db_conn import connect_db + + conn = connect_db() + try: + row = conn.execute( + "SELECT value FROM ctp_worker_snapshots WHERE key='account' LIMIT 1" + ).fetchone() + finally: + conn.close() + if row and row["value"]: + acc = json.loads(row["value"]) + balance = float(acc.get("balance") or 0) + available = acc.get("available") + out: dict[str, float] = {} + if balance > 0: + out["balance"] = balance + if available is not None: + out["available"] = float(available) + return out + except Exception: + pass + del mode + return {} + + def get_account_capital(conn, get_setting: Callable[[str, str], str]) -> float: - """优先 SimNow/期货公司 CTP 权益;未连接时用设置中的参考资金。""" + """优先 SimNow/期货公司 CTP 权益;未连接时用最近快照或设置中的参考资金。""" del conn mode = get_trading_mode(get_setting) try: @@ -93,6 +132,10 @@ def get_account_capital(conn, get_setting: Callable[[str, str], str]) -> float: return float(bal) except Exception: pass + cached = _cached_ctp_account(mode) + balance = float(cached.get("balance") or 0) + if balance > 0: + return balance try: return float(get_setting("live_capital", "0") or 0) except (TypeError, ValueError): diff --git a/vnpy_bridge.py b/vnpy_bridge.py index 6e24a56..ee0b040 100644 --- a/vnpy_bridge.py +++ b/vnpy_bridge.py @@ -14,9 +14,11 @@ import time from collections import deque from typing import Any, Callable, Optional +import ctp_ipc_client from locale_fix import ensure_process_locale -ensure_process_locale() +if ctp_ipc_client.is_worker_role(): + ensure_process_locale() from ctp_settings import live_setting_dict, simnow_setting_dict from ctp_symbol import ths_to_vnpy_symbol, to_vnpy_exchange @@ -34,6 +36,10 @@ CTP_COOLDOWN_UNTIL_KEY = "ctp_login_cooldown_until" CTP_LAST_ERROR_KEY = "ctp_last_error" +def _use_ctp_worker_client() -> bool: + return not ctp_ipc_client.is_worker_role() + + def _persist_login_cooldown(seconds: float) -> None: from fee_specs import get_setting, set_setting @@ -163,7 +169,7 @@ def _fire_position_refresh_callback_debounced(*, min_interval: float = 0.35) -> def _fire_position_refresh_burst() -> None: """连接后持仓回报可能分批到达,分多次触发快照刷新。""" _fire_position_refresh_callback() - for delay in (1.5, 4.0, 10.0, 18.0): + for delay in (0.4, 0.9, 1.5, 3.0, 6.0, 12.0, 20.0): threading.Timer(delay, _fire_position_refresh_callback).start() @@ -183,10 +189,11 @@ def _schedule_after_instruments_ready(bridge: "CtpBridge") -> None: bridge._ensure_instrument_margin_hooks() with _ctp_td_lock: bridge.request_position_snapshot(force=True) - time.sleep(2.0) + time.sleep(0.8) with _ctp_td_lock: bridge.calibrate_trading_state() _fire_position_refresh_callback() + _fire_position_refresh_burst() n = len(bridge._collect_positions()) logger.info("CTP 合约加载完成,持仓 %s 条,已刷新快照", n) except Exception as exc: @@ -217,7 +224,7 @@ _bridge: Optional["CtpBridge"] = None _bridge_lock = threading.Lock() _ctp_td_lock = threading.RLock() POSITION_QUERY_MIN_INTERVAL_SEC = 5.0 -POSITION_QUERY_RETRY_DELAYS_SEC = (22.0, 50.0, 95.0) +POSITION_QUERY_RETRY_DELAYS_SEC = (1.5, 4.0, 9.0, 18.0, 35.0) TRADE_QUERY_MIN_INTERVAL_SEC = 10.0 @@ -337,6 +344,7 @@ class CtpBridge: self._trade_query_event = threading.Event() self._last_trade_query_ts: float = 0.0 self._last_connect_ok_ts: float = 0.0 + self._connect_started_ts: float = 0.0 self._tick_hooked = False self._position_hooked = False self._order_hooked = False @@ -704,6 +712,16 @@ class CtpBridge: cooldown = self.login_cooldown_remaining() connecting = bool(self._connect_in_progress and cooldown <= 0) last_error = self._last_error or _load_persisted_last_error() + if ( + connecting + and self._connect_started_ts > 0 + and time.time() - self._connect_started_ts > CONNECT_WAIT_SEC + 10 + and not last_error + ): + last_error = ( + f"CTP 连接进行中已超过 {CONNECT_WAIT_SEC}s," + "可能前置不可达或柜台响应慢" + ) return { "vnpy_installed": self.available(), "connected": self._connected_mode == mode, @@ -746,6 +764,7 @@ class CtpBridge: raise ValueError(f"{_mode_label(mode)}:未配置交易服务器地址") self._connect_in_progress = True + self._connect_started_ts = time.time() try: with _ctp_td_lock: with self._connect_lock: @@ -806,6 +825,10 @@ class CtpBridge: self.calibrate_trading_state() except Exception as exc: logger.debug("post-connect calibrate: %s", exc) + try: + self.request_position_snapshot(force=True) + except Exception as exc: + logger.debug("post-connect position query: %s", exc) self._ensure_instrument_margin_hooks() _fire_position_refresh_burst() _schedule_position_query_retries(self) @@ -823,6 +846,7 @@ class CtpBridge: raise RuntimeError(hint) finally: self._connect_in_progress = False + self._connect_started_ts = 0.0 def start_connect_async( self, mode: str, *, force: bool = False, scheduled: bool = False, @@ -859,13 +883,39 @@ class CtpBridge: except Exception as exc: logger.warning("CTP 后台连接失败: %s", exc) + def _watchdog() -> None: + deadline = CONNECT_WAIT_SEC + 25 + time.sleep(deadline) + if not self._connect_in_progress: + return + logger.warning( + "CTP 连接 watchdog 超时 %.0fs,重置连接状态 [%s]", + deadline, + mode, + ) + self._connect_in_progress = False + self._connect_started_ts = 0.0 + hint = ( + f"CTP 连接超时(>{deadline:.0f}s),可能前置不可达或柜台无响应。" + "请检查 SimNow 前置地址与账号,勿频繁重试。" + ) + self._last_error = hint + _persist_last_error(hint) + try: + self._close_gateway() + except Exception as exc: + logger.debug("watchdog gateway close: %s", exc) + threading.Thread(target=_run, daemon=True, name="ctp-connect-async").start() + threading.Thread(target=_watchdog, daemon=True, name="ctp-connect-watchdog").start() return {"started": True, "connecting": True, "connected": False} def ensure_connected(self, mode: str) -> None: if self._connected_mode == mode and self.ping(): return - self.connect(mode) + if self._connect_in_progress: + raise RuntimeError("CTP 连接中,请稍候") + raise RuntimeError("请先连接 CTP") def require_connected(self, mode: str) -> None: """报单前检查:须已连接,不在此发起阻塞式 connect。""" @@ -2125,8 +2175,170 @@ class CtpBridge: return False -def get_bridge() -> CtpBridge: +class CtpBridgeProxy: + """Client-side stand-in for CtpBridge, forwarding calls to qihuo-ctp.""" + + _engine = None + + @property + def connected_mode(self) -> Optional[str]: + st = ctp_ipc_client.health().get("status") or {} + return st.get("connected_mode") + + @property + def last_error(self) -> str: + st = ctp_ipc_client.health().get("status") or {} + return str(st.get("last_error") or "") + + @property + def _last_connect_ok_ts(self) -> float: + st = ctp_ipc_client.health().get("status") or {} + try: + return float(st.get("last_connect_ok_ts") or 0) + except (TypeError, ValueError): + return 0.0 + + def available(self) -> bool: + return bool(ctp_ipc_client.health().get("worker_online")) + + def status(self, mode: str) -> dict[str, Any]: + return ctp_ipc_client.status(mode) + + def ping(self) -> bool: + return bool(ctp_ipc_client.health().get("worker_online")) + + def connect(self, mode: str, *, force: bool = False) -> dict[str, Any]: + return ctp_ipc_client.connect(mode, force=force) + + def start_connect_async( + self, + mode: str, + *, + force: bool = False, + scheduled: bool = False, + ) -> dict[str, Any]: + return ctp_ipc_client.start_connect(mode, force=force, scheduled=scheduled) + + def connect_in_progress(self) -> bool: + data = ctp_ipc_client.bridge_action("connect_in_progress") + return bool(data.get("result")) + + def login_cooldown_remaining(self) -> int: + st = ctp_ipc_client.health().get("status") or {} + try: + return int(st.get("login_cooldown_sec") or 0) + except (TypeError, ValueError): + return 0 + + def ensure_connected(self, mode: str) -> None: + if not self.status(mode).get("connected"): + raise RuntimeError("CTP worker 未连接,请重连后再操作") + + def require_connected(self, mode: str) -> None: + self.ensure_connected(mode) + + def get_account(self) -> dict[str, Any]: + mode = self.connected_mode or "simulation" + return ctp_ipc_client.account(mode) + + def list_positions( + self, + *, + refresh_if_empty: bool = True, + refresh_margin: bool = False, + ) -> list[dict[str, Any]]: + mode = self.connected_mode or "simulation" + return ctp_ipc_client.positions( + mode, + refresh_if_empty=refresh_if_empty, + refresh_margin=refresh_margin, + ) + + def list_active_orders(self) -> list[dict[str, Any]]: + mode = self.connected_mode or "simulation" + return ctp_ipc_client.active_orders(mode) + + def list_trades(self, *, refresh: bool = False) -> list[dict[str, Any]]: + mode = self.connected_mode or "simulation" + return ctp_ipc_client.trades(mode, refresh=refresh) + + def get_tick_price(self, ths_code: str, *, mode: str = "") -> Optional[float]: + return ctp_ipc_client.tick_price(mode or self.connected_mode or "simulation", ths_code) + + def get_tick_detail(self, ths_code: str, *, mode: str = "") -> dict[str, Any]: + return ctp_ipc_client.tick_detail(mode or self.connected_mode or "simulation", ths_code) + + def estimate_margin_one_lot( + self, + ths_code: str, + price: float, + *, + direction: str = "long", + ) -> Optional[float]: + return ctp_ipc_client.estimate_margin_one_lot( + self.connected_mode or "simulation", + ths_code, + price, + direction=direction, + ) + + def lookup_contract_spec(self, ths_code: str) -> Optional[dict]: + return ctp_ipc_client.contract_spec(self.connected_mode or "simulation", ths_code) + + def send_order(self, **payload: Any) -> str: + data = ctp_ipc_client.send_order(payload) + return str(data.get("order_id") or "") + + def cancel_order(self, vt_orderid: str) -> bool: + return ctp_ipc_client.cancel_order(self.connected_mode or "simulation", vt_orderid) + + def calibrate_trading_state(self) -> Any: + return ctp_ipc_client.bridge_action("calibrate_trading_state").get("result") + + def request_position_snapshot(self, *, force: bool = False) -> Any: + return ctp_ipc_client.bridge_action( + "request_position_snapshot", + {"force": bool(force)}, + ).get("result") + + def subscribe_symbol(self, symbol: str) -> Any: + return ctp_ipc_client.bridge_action("subscribe_symbol", {"symbol": symbol}).get("result") + + def refresh_positions(self) -> Any: + return ctp_ipc_client.bridge_action("refresh_positions").get("result") + + def reconnect_after_settings_saved(self, mode: str) -> Any: + return ctp_ipc_client.bridge_action( + "reconnect_after_settings_saved", + {"mode": mode}, + ).get("result") + + def query_all_commissions(self, *, mode: str = "") -> list[dict]: + data = ctp_ipc_client.bridge_action("query_all_commissions", {"mode": mode}) + return list(data.get("result") or []) + + def query_instrument_commission(self, symbol: str, *, mode: str = "") -> dict: + data = ctp_ipc_client.bridge_action( + "query_instrument_commission", + {"symbol": symbol, "mode": mode or self.connected_mode or "simulation"}, + ) + return dict(data.get("result") or {}) + + def get_kline_bars_1m(self, ths_code: str, *, mode: str) -> list[dict]: + data = ctp_ipc_client.bridge_action( + "get_kline_bars_1m", + {"symbol": ths_code, "mode": mode}, + ) + return list(data.get("result") or []) + + def _close_gateway(self) -> None: + ctp_ipc_client.disconnect() + + +def get_bridge(): global _bridge + if _use_ctp_worker_client(): + return CtpBridgeProxy() with _bridge_lock: if _bridge is None: _bridge = CtpBridge() @@ -2134,10 +2346,14 @@ def get_bridge() -> CtpBridge: def try_init_vnpy(_settings: dict | None = None) -> bool: + if _use_ctp_worker_client(): + return bool(ctp_ipc_client.health().get("worker_online")) return get_bridge().available() def vnpy_available() -> bool: + if _use_ctp_worker_client(): + return bool(ctp_ipc_client.health().get("worker_online")) return get_bridge().available() @@ -2156,6 +2372,9 @@ def _ctp_connect_permitted(*, scheduled: bool = False) -> bool: def ctp_disconnect(*, set_disabled_hint: bool = False) -> None: """主动断开 CTP 并清理内存状态。""" + if _use_ctp_worker_client(): + ctp_ipc_client.disconnect(set_disabled_hint=set_disabled_hint) + return from ctp_settings import CTP_DISABLED_HINT b = get_bridge() @@ -2169,6 +2388,8 @@ def ctp_disconnect(*, set_disabled_hint: bool = False) -> None: def ctp_connect(mode: str, *, force: bool = False) -> dict[str, Any]: + if _use_ctp_worker_client(): + return ctp_ipc_client.connect(mode, force=force) b = get_bridge() b.connect(mode, force=force) return b.status(mode) @@ -2176,6 +2397,8 @@ def ctp_connect(mode: str, *, force: bool = False) -> dict[str, Any]: def ctp_start_connect(mode: str, *, force: bool = False, scheduled: bool = False) -> dict[str, Any]: """非阻塞发起连接,供 Web API 使用。""" + if _use_ctp_worker_client(): + return ctp_ipc_client.start_connect(mode, force=force, scheduled=scheduled) b = get_bridge() info = b.start_connect_async(mode, force=force, scheduled=scheduled) st = b.status(mode) @@ -2184,6 +2407,13 @@ def ctp_start_connect(mode: str, *, force: bool = False, scheduled: bool = False def ctp_try_auto_reconnect(mode: str) -> bool: """断线时静默异步重连;已连接且交易通道正常则不再重复 connect。""" + if _use_ctp_worker_client(): + info = ctp_ipc_client.start_connect(mode, force=False, scheduled=True) + return bool( + info.get("connected") + or info.get("connecting") + or info.get("started") + ) if not _ctp_connect_permitted(scheduled=True): return False b = get_bridge() @@ -2222,6 +2452,10 @@ def ctp_try_auto_reconnect(mode: str) -> bool: def ctp_status(mode: str) -> dict[str, Any]: from ctp_settings import CTP_DISABLED_HINT, is_ctp_auto_connect_enabled + if _use_ctp_worker_client(): + st = ctp_ipc_client.status(mode) + st["auto_connect_enabled"] = is_ctp_auto_connect_enabled() + return st auto = is_ctp_auto_connect_enabled() st = get_bridge().status(mode) st["auto_connect_enabled"] = auto @@ -2245,6 +2479,8 @@ def ctp_status(mode: str) -> dict[str, Any]: def ctp_get_account(mode: str) -> dict[str, Any]: + if _use_ctp_worker_client(): + return ctp_ipc_client.account(mode) b = get_bridge() b.ensure_connected(mode) return b.get_account() @@ -2269,6 +2505,18 @@ def ctp_sum_position_margins( def ctp_account_margin_used(mode: str) -> Optional[float]: """账户实际占用保证金 ≈ 权益 − 可用(与顶栏柜台资金一致)。""" + if _use_ctp_worker_client(): + try: + acc = ctp_ipc_client.account(mode) + balance = float(acc.get("balance") or 0) + available = float(acc.get("available") or 0) + if balance <= 0: + return None + used = balance - available + return round(used, 2) if used > 0 else None + except Exception as exc: + logger.debug("ctp_account_margin_used ipc: %s", exc) + return None b = get_bridge() if b.connected_mode != mode or not b.ping(): return None @@ -2291,6 +2539,12 @@ def ctp_list_positions( refresh_if_empty: bool = True, refresh_margin: bool = False, ) -> list[dict[str, Any]]: + if _use_ctp_worker_client(): + return ctp_ipc_client.positions( + mode, + refresh_if_empty=refresh_if_empty, + refresh_margin=refresh_margin, + ) b = get_bridge() if b.connected_mode != mode or not b.ping(): return [] @@ -2298,18 +2552,24 @@ def ctp_list_positions( def ctp_list_active_orders(mode: str) -> list[dict[str, Any]]: + if _use_ctp_worker_client(): + return ctp_ipc_client.active_orders(mode) b = get_bridge() b.ensure_connected(mode) return b.list_active_orders() def ctp_cancel_order(mode: str, vt_orderid: str) -> bool: + if _use_ctp_worker_client(): + return ctp_ipc_client.cancel_order(mode, vt_orderid) b = get_bridge() b.ensure_connected(mode) return b.cancel_order(vt_orderid) def ctp_list_trades(mode: str, *, refresh: bool = False) -> list[dict[str, Any]]: + if _use_ctp_worker_client(): + return ctp_ipc_client.trades(mode, refresh=refresh) b = get_bridge() if b.connected_mode != mode or not b.ping(): return [] @@ -2318,6 +2578,8 @@ def ctp_list_trades(mode: str, *, refresh: bool = False) -> list[dict[str, Any]] def ctp_get_tick_price(mode: str, ths_code: str) -> Optional[float]: """CTP 柜台最新价(需已连接并订阅)。""" + if _use_ctp_worker_client(): + return ctp_ipc_client.tick_price(mode, ths_code) b = get_bridge() if b.connected_mode != mode: return None @@ -2329,6 +2591,8 @@ def ctp_get_tick_price(mode: str, ths_code: str) -> Optional[float]: def ctp_get_tick_detail(mode: str, ths_code: str) -> dict[str, Any]: + if _use_ctp_worker_client(): + return ctp_ipc_client.tick_detail(mode, ths_code) b = get_bridge() if b.connected_mode != mode: return {} @@ -2346,6 +2610,13 @@ def ctp_estimate_margin_one_lot( *, direction: str = "long", ) -> Optional[float]: + if _use_ctp_worker_client(): + return ctp_ipc_client.estimate_margin_one_lot( + mode, + ths_code, + price, + direction=direction, + ) b = get_bridge() if b.connected_mode != mode or not b.ping(): return None @@ -2357,6 +2628,8 @@ def ctp_estimate_margin_one_lot( def ctp_lookup_contract_spec(mode: str, ths_code: str) -> Optional[dict]: + if _use_ctp_worker_client(): + return ctp_ipc_client.contract_spec(mode, ths_code) b = get_bridge() if b.connected_mode != mode or not b.ping(): return None @@ -2390,6 +2663,17 @@ def execute_order( order_type: str = "limit", ) -> dict[str, Any]: """统一下单:simulation=SimNow,live=期货公司 CTP。""" + if _use_ctp_worker_client(): + return ctp_ipc_client.send_order({ + "mode": mode, + "offset": offset, + "symbol": symbol, + "direction": direction, + "lots": lots, + "price": price, + "settings": settings or {}, + "order_type": order_type, + }) del conn, settings if mode not in ("simulation", "live"): raise ValueError("未知交易模式")