# Copyright (c) 2025-2026 马建军. All rights reserved. # 专有软件 — 未经授权禁止复制、传播、转售。 # 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md """Isolated local CTP worker. This process is the only process that should instantiate vn.py / vnpy_ctp. The Flask web app talks to it through localhost HTTP via ctp_ipc_client.py. """ from __future__ import annotations import logging import os import threading import time from typing import Any os.environ.setdefault("QIHUO_CTP_ROLE", "worker") from flask import Flask, jsonify, request from ctp_ipc_client import worker_token from db_conn import DB_PATH, commit_retry, connect_db from fee_specs import get_setting, set_setting from locale_fix import ensure_process_locale from market_sessions import is_trading_session from sl_tp_guard import check_sl_tp_on_tick, ensure_monitor_order_columns, start_sl_tp_guard_worker from strategy.strategy_db import init_strategy_tables from trading_context import get_account_capital, get_trading_mode, get_trailing_be_tick_buffer from vnpy_bridge import ( _ctp_td_lock, ctp_cancel_order, ctp_disconnect, ctp_estimate_margin_one_lot, ctp_get_account, ctp_get_tick_detail, ctp_get_tick_price, ctp_list_active_orders, ctp_list_positions, ctp_list_trades, ctp_lookup_contract_spec, ctp_start_connect, ctp_status, ctp_try_auto_reconnect, execute_order, get_bridge, set_ctp_connected_callback, set_position_refresh_callback, set_tick_quote_callback, set_tick_sl_tp_callback, try_init_vnpy, ) logging.basicConfig( level=os.getenv("LOG_LEVEL", "INFO"), format="%(asctime)s %(levelname)s [%(name)s] %(message)s", ) logger = logging.getLogger(__name__) app = Flask(__name__) _started_workers = False _last_snapshot_ts = 0.0 _snapshot_lock = threading.Lock() def _json_ok(**payload: Any): return jsonify({"ok": True, **payload}) def _json_error(exc: Exception, *, status_code: int = 500): return jsonify({"ok": False, "error": str(exc)}), status_code def _require_token() -> None: expected = worker_token() got = request.headers.get("X-Qihuo-CTP-Token", "") if expected and got != expected: raise PermissionError("unauthorized") @app.before_request def _auth(): _require_token() @app.errorhandler(Exception) def _handle_error(exc: Exception): code = 401 if isinstance(exc, PermissionError) else 500 logger.warning("ctp worker request failed: %s", exc) return _json_error(exc, status_code=code) def _mode_from_request() -> str: data = request.get_json(silent=True) or {} return ( data.get("mode") or request.args.get("mode") or get_trading_mode(get_setting) or "simulation" ) def _fast_status(mode: str) -> dict[str, Any]: """Return worker/native bridge state without slow network probing.""" from ctp_settings import CTP_DISABLED_HINT, is_ctp_auto_connect_enabled try: st = dict(get_bridge().status(mode) or {}) except Exception as exc: st = { "connected": False, "connecting": False, "connected_mode": None, "last_error": str(exc), "mode_label": "SimNow" if mode == "simulation" else "期货公司实盘", } auto = is_ctp_auto_connect_enabled() st["auto_connect_enabled"] = auto st["worker_online"] = True if not auto: st["disabled_hint"] = CTP_DISABLED_HINT if not st.get("connected") and not st.get("connecting"): st["last_error"] = "" st["td_reachable"] = None return st def _send_wechat_msg(content: str) -> None: webhook = get_setting("wechat_webhook", "") if not webhook: return try: import requests requests.post( webhook, json={"msgtype": "text", "text": {"content": f"【国内期货】\n{content}"}}, timeout=10, ) except Exception as exc: logger.debug("wechat notify failed: %s", exc) def _init_worker_tables(conn) -> None: init_strategy_tables(conn) ensure_monitor_order_columns(conn) def _capital(conn) -> float: try: return float(get_account_capital(get_setting, conn=conn) or 0) except Exception: return 0.0 def _persist_snapshot(mode: str) -> None: global _last_snapshot_ts with _snapshot_lock: now = time.time() if now - _last_snapshot_ts < 0.25: return _last_snapshot_ts = now try: import json st = _fast_status(mode) positions = ctp_list_positions(mode, refresh_if_empty=False, refresh_margin=False) account = ctp_get_account(mode) if st.get("connected") else {} conn = connect_db(DB_PATH) try: conn.execute( """CREATE TABLE IF NOT EXISTS ctp_worker_snapshots ( key TEXT PRIMARY KEY, value TEXT, updated_at REAL )""" ) for key, value in ( ("status", st), ("positions", positions), ("account", account), ): conn.execute( """INSERT INTO ctp_worker_snapshots(key, value, updated_at) VALUES(?,?,?) ON CONFLICT(key) DO UPDATE SET value=excluded.value, updated_at=excluded.updated_at""", (key, json.dumps(value, ensure_ascii=False), now), ) commit_retry(conn) finally: conn.close() except Exception as exc: logger.debug("persist ctp snapshot: %s", exc) def _on_position_refresh() -> None: try: _persist_snapshot(get_trading_mode(get_setting)) except Exception as exc: logger.debug("position refresh callback: %s", exc) def _on_tick_quote() -> None: _on_position_refresh() def _on_tick_sl_tp(exchange: str, symbol: str, price: float) -> None: mode = get_trading_mode(get_setting) if not ctp_status(mode).get("connected"): return conn = connect_db(DB_PATH) try: _init_worker_tables(conn) capital = _capital(conn) n = check_sl_tp_on_tick( conn, mode, exchange, symbol, price, capital=capital, notify_fn=_send_wechat_msg, be_tick_mult=get_trailing_be_tick_buffer(get_setting), ) if n: commit_retry(conn) _persist_snapshot(mode) except Exception as exc: logger.warning("worker tick sl/tp: %s", exc) finally: conn.close() def _on_ctp_connected(mode: str) -> None: try: with _ctp_td_lock: get_bridge().request_position_snapshot(force=True) get_bridge().calibrate_trading_state() _persist_snapshot(mode) except Exception as exc: logger.debug("worker ctp connected callback: %s", exc) def _start_background_workers() -> None: global _started_workers if _started_workers: return _started_workers = True set_position_refresh_callback(_on_position_refresh) set_tick_quote_callback(_on_tick_quote) set_tick_sl_tp_callback(_on_tick_sl_tp) set_ctp_connected_callback(_on_ctp_connected) from ctp_fee_worker import start_ctp_fee_worker from ctp_premarket_connect import start_ctp_premarket_connect_worker from ctp_reconnect import start_ctp_reconnect_worker from order_pending import reconcile_pending_orders from pending_order_worker import start_pending_order_worker def _mode() -> str: return get_trading_mode(get_setting) start_ctp_reconnect_worker(get_mode_fn=_mode, get_setting_fn=get_setting) start_ctp_premarket_connect_worker(get_mode_fn=_mode, get_setting_fn=get_setting) start_ctp_fee_worker( get_mode_fn=_mode, get_setting_fn=get_setting, set_setting_fn=set_setting, ) start_pending_order_worker( db_path=DB_PATH, get_mode_fn=_mode, init_tables_fn=_init_worker_tables, get_capital_fn=_capital, reconcile_fn=reconcile_pending_orders, on_changed_fn=lambda: _persist_snapshot(_mode()), ) start_sl_tp_guard_worker( db_path=DB_PATH, get_mode_fn=_mode, init_tables_fn=_init_worker_tables, get_capital_fn=_capital, get_be_tick_buffer_fn=lambda: get_trailing_be_tick_buffer(get_setting), notify_fn=_send_wechat_msg, ) def _snapshot_loop() -> None: time.sleep(3) while True: try: mode = _mode() if _fast_status(mode).get("connected"): _persist_snapshot(mode) except Exception as exc: logger.debug("worker snapshot loop: %s", exc) time.sleep(2 if is_trading_session() else 15) threading.Thread(target=_snapshot_loop, daemon=True, name="ctp-worker-snapshot").start() @app.route("/health") def health(): mode = request.args.get("mode") or get_trading_mode(get_setting) st = _fast_status(mode) return _json_ok( worker_online=True, role=os.getenv("QIHUO_CTP_ROLE", "worker"), mode=mode, status=st, ts=time.time(), ) @app.route("/ctp/status") def api_status(): mode = _mode_from_request() return _json_ok(status=_fast_status(mode)) @app.route("/ctp/connect", methods=["POST"]) def api_connect(): data = request.get_json(silent=True) or {} mode = data.get("mode") or get_trading_mode(get_setting) info = ctp_start_connect(mode, force=bool(data.get("force"))) st = info.get("status") or _fast_status(mode) return _json_ok(status=st, **{k: v for k, v in info.items() if k != "status"}) @app.route("/ctp/start_connect", methods=["POST"]) def api_start_connect(): data = request.get_json(silent=True) or {} mode = data.get("mode") or get_trading_mode(get_setting) return _json_ok(**ctp_start_connect( mode, force=bool(data.get("force")), scheduled=bool(data.get("scheduled")), )) @app.route("/ctp/disconnect", methods=["POST"]) def api_disconnect(): data = request.get_json(silent=True) or {} ctp_disconnect(set_disabled_hint=bool(data.get("set_disabled_hint"))) return _json_ok(disconnected=True) @app.route("/ctp/account") def api_account(): mode = _mode_from_request() if not _fast_status(mode).get("connected"): return _json_ok(account={}) return _json_ok(account=ctp_get_account(mode)) @app.route("/ctp/positions", methods=["POST"]) def api_positions(): data = request.get_json(silent=True) or {} mode = data.get("mode") or get_trading_mode(get_setting) return _json_ok(positions=ctp_list_positions( mode, refresh_if_empty=bool(data.get("refresh_if_empty", True)), refresh_margin=bool(data.get("refresh_margin", False)), )) @app.route("/ctp/trades", methods=["POST"]) def api_trades(): data = request.get_json(silent=True) or {} mode = data.get("mode") or get_trading_mode(get_setting) return _json_ok(trades=ctp_list_trades(mode, refresh=bool(data.get("refresh")))) @app.route("/ctp/active_orders") def api_active_orders(): mode = _mode_from_request() return _json_ok(orders=ctp_list_active_orders(mode)) @app.route("/ctp/tick_price", methods=["POST"]) def api_tick_price(): data = request.get_json(silent=True) or {} return _json_ok(price=ctp_get_tick_price( data.get("mode") or get_trading_mode(get_setting), data.get("symbol") or "", )) @app.route("/ctp/tick_detail", methods=["POST"]) def api_tick_detail(): data = request.get_json(silent=True) or {} return _json_ok(detail=ctp_get_tick_detail( data.get("mode") or get_trading_mode(get_setting), data.get("symbol") or "", )) @app.route("/ctp/estimate_margin_one_lot", methods=["POST"]) def api_estimate_margin(): data = request.get_json(silent=True) or {} return _json_ok(margin=ctp_estimate_margin_one_lot( data.get("mode") or get_trading_mode(get_setting), data.get("symbol") or "", float(data.get("price") or 0), direction=data.get("direction") or "long", )) @app.route("/ctp/contract_spec", methods=["POST"]) def api_contract_spec(): data = request.get_json(silent=True) or {} return _json_ok(spec=ctp_lookup_contract_spec( data.get("mode") or get_trading_mode(get_setting), data.get("symbol") or "", )) @app.route("/ctp/order", methods=["POST"]) def api_order(): data = request.get_json(silent=True) or {} mode = data.get("mode") or get_trading_mode(get_setting) result = execute_order( None, mode=mode, offset=data.get("offset") or "open", symbol=data.get("symbol") or "", direction=data.get("direction") or "long", lots=int(data.get("lots") or 1), price=float(data.get("price") or 0), settings=data.get("settings") or {}, order_type=data.get("order_type") or "limit", ) _persist_snapshot(mode) return _json_ok(**result) @app.route("/ctp/cancel", methods=["POST"]) def api_cancel(): data = request.get_json(silent=True) or {} mode = data.get("mode") or get_trading_mode(get_setting) cancelled = ctp_cancel_order(mode, data.get("vt_orderid") or "") _persist_snapshot(mode) return _json_ok(cancelled=cancelled) @app.route("/ctp/bridge/", methods=["POST"]) def api_bridge_action(action: str): data = request.get_json(silent=True) or {} b = get_bridge() if action == "calibrate_trading_state": return _json_ok(result=b.calibrate_trading_state()) if action == "request_position_snapshot": return _json_ok(result=b.request_position_snapshot(force=bool(data.get("force")))) if action == "subscribe_symbol": return _json_ok(result=b.subscribe_symbol(data.get("symbol") or "")) if action == "refresh_positions": return _json_ok(result=b.refresh_positions()) if action == "connect_in_progress": return _json_ok(result=b.connect_in_progress()) if action == "reconnect_after_settings_saved": mode = data.get("mode") or get_trading_mode(get_setting) return _json_ok(result=b.reconnect_after_settings_saved(mode)) if action == "query_all_commissions": return _json_ok(result=b.query_all_commissions( mode=data.get("mode") or get_trading_mode(get_setting), )) if action == "query_instrument_commission": return _json_ok(result=b.query_instrument_commission( data.get("symbol") or "", mode=data.get("mode") or get_trading_mode(get_setting), )) if action == "get_kline_bars_1m": return _json_ok(result=b.get_kline_bars_1m( data.get("symbol") or "", mode=data.get("mode") or get_trading_mode(get_setting), )) return _json_error(ValueError(f"unsupported bridge action: {action}"), status_code=404) def main() -> None: ensure_process_locale() try_init_vnpy({}) _start_background_workers() host = os.getenv("QIHUO_CTP_WORKER_HOST", "127.0.0.1") port = int(os.getenv("QIHUO_CTP_WORKER_PORT", "6601") or 6601) logger.info("starting qihuo-ctp worker on %s:%s", host, port) app.run(host=host, port=port, debug=False, threaded=True, use_reloader=False) if __name__ == "__main__": main()