Files
qihuo/ctp_worker.py
T
dekun 9cd81a3ea7 Isolate CTP in worker process and improve strategy roll UX.
Split vn.py into qihuo-ctp worker with IPC client bridge, keep CTP connected during breaks with cached account fallback, speed up strategy page loads, and allow off-session breakout roll submissions.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-07-01 12:37:44 +08:00

495 lines
15 KiB
Python

# Copyright (c) 2025-2026 马建军. All rights reserved.
# 专有软件 — 未经授权禁止复制、传播、转售。
# 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md
"""Isolated local CTP worker.
This process is the only process that should instantiate vn.py / vnpy_ctp.
The Flask web app talks to it through localhost HTTP via ctp_ipc_client.py.
"""
from __future__ import annotations
import logging
import os
import threading
import time
from typing import Any
os.environ.setdefault("QIHUO_CTP_ROLE", "worker")
from flask import Flask, jsonify, request
from ctp_ipc_client import worker_token
from db_conn import DB_PATH, commit_retry, connect_db
from fee_specs import get_setting, set_setting
from locale_fix import ensure_process_locale
from market_sessions import is_trading_session
from sl_tp_guard import check_sl_tp_on_tick, ensure_monitor_order_columns, start_sl_tp_guard_worker
from strategy.strategy_db import init_strategy_tables
from trading_context import get_account_capital, get_trading_mode, get_trailing_be_tick_buffer
from vnpy_bridge import (
_ctp_td_lock,
ctp_cancel_order,
ctp_disconnect,
ctp_estimate_margin_one_lot,
ctp_get_account,
ctp_get_tick_detail,
ctp_get_tick_price,
ctp_list_active_orders,
ctp_list_positions,
ctp_list_trades,
ctp_lookup_contract_spec,
ctp_start_connect,
ctp_status,
ctp_try_auto_reconnect,
execute_order,
get_bridge,
set_ctp_connected_callback,
set_position_refresh_callback,
set_tick_quote_callback,
set_tick_sl_tp_callback,
try_init_vnpy,
)
logging.basicConfig(
level=os.getenv("LOG_LEVEL", "INFO"),
format="%(asctime)s %(levelname)s [%(name)s] %(message)s",
)
logger = logging.getLogger(__name__)
app = Flask(__name__)
_started_workers = False
_last_snapshot_ts = 0.0
_snapshot_lock = threading.Lock()
def _json_ok(**payload: Any):
return jsonify({"ok": True, **payload})
def _json_error(exc: Exception, *, status_code: int = 500):
return jsonify({"ok": False, "error": str(exc)}), status_code
def _require_token() -> None:
expected = worker_token()
got = request.headers.get("X-Qihuo-CTP-Token", "")
if expected and got != expected:
raise PermissionError("unauthorized")
@app.before_request
def _auth():
_require_token()
@app.errorhandler(Exception)
def _handle_error(exc: Exception):
code = 401 if isinstance(exc, PermissionError) else 500
logger.warning("ctp worker request failed: %s", exc)
return _json_error(exc, status_code=code)
def _mode_from_request() -> str:
data = request.get_json(silent=True) or {}
return (
data.get("mode")
or request.args.get("mode")
or get_trading_mode(get_setting)
or "simulation"
)
def _fast_status(mode: str) -> dict[str, Any]:
"""Return worker/native bridge state without slow network probing."""
from ctp_settings import CTP_DISABLED_HINT, is_ctp_auto_connect_enabled
try:
st = dict(get_bridge().status(mode) or {})
except Exception as exc:
st = {
"connected": False,
"connecting": False,
"connected_mode": None,
"last_error": str(exc),
"mode_label": "SimNow" if mode == "simulation" else "期货公司实盘",
}
auto = is_ctp_auto_connect_enabled()
st["auto_connect_enabled"] = auto
st["worker_online"] = True
if not auto:
st["disabled_hint"] = CTP_DISABLED_HINT
if not st.get("connected") and not st.get("connecting"):
st["last_error"] = ""
st["td_reachable"] = None
return st
def _send_wechat_msg(content: str) -> None:
webhook = get_setting("wechat_webhook", "")
if not webhook:
return
try:
import requests
requests.post(
webhook,
json={"msgtype": "text", "text": {"content": f"【国内期货】\n{content}"}},
timeout=10,
)
except Exception as exc:
logger.debug("wechat notify failed: %s", exc)
def _init_worker_tables(conn) -> None:
init_strategy_tables(conn)
ensure_monitor_order_columns(conn)
def _capital(conn) -> float:
try:
return float(get_account_capital(get_setting, conn=conn) or 0)
except Exception:
return 0.0
def _persist_snapshot(mode: str) -> None:
global _last_snapshot_ts
with _snapshot_lock:
now = time.time()
if now - _last_snapshot_ts < 0.25:
return
_last_snapshot_ts = now
try:
import json
st = _fast_status(mode)
positions = ctp_list_positions(mode, refresh_if_empty=False, refresh_margin=False)
account = ctp_get_account(mode) if st.get("connected") else {}
conn = connect_db(DB_PATH)
try:
conn.execute(
"""CREATE TABLE IF NOT EXISTS ctp_worker_snapshots (
key TEXT PRIMARY KEY,
value TEXT,
updated_at REAL
)"""
)
for key, value in (
("status", st),
("positions", positions),
("account", account),
):
conn.execute(
"""INSERT INTO ctp_worker_snapshots(key, value, updated_at)
VALUES(?,?,?)
ON CONFLICT(key) DO UPDATE SET
value=excluded.value,
updated_at=excluded.updated_at""",
(key, json.dumps(value, ensure_ascii=False), now),
)
commit_retry(conn)
finally:
conn.close()
except Exception as exc:
logger.debug("persist ctp snapshot: %s", exc)
def _on_position_refresh() -> None:
try:
_persist_snapshot(get_trading_mode(get_setting))
except Exception as exc:
logger.debug("position refresh callback: %s", exc)
def _on_tick_quote() -> None:
_on_position_refresh()
def _on_tick_sl_tp(exchange: str, symbol: str, price: float) -> None:
mode = get_trading_mode(get_setting)
if not ctp_status(mode).get("connected"):
return
conn = connect_db(DB_PATH)
try:
_init_worker_tables(conn)
capital = _capital(conn)
n = check_sl_tp_on_tick(
conn,
mode,
exchange,
symbol,
price,
capital=capital,
notify_fn=_send_wechat_msg,
be_tick_mult=get_trailing_be_tick_buffer(get_setting),
)
if n:
commit_retry(conn)
_persist_snapshot(mode)
except Exception as exc:
logger.warning("worker tick sl/tp: %s", exc)
finally:
conn.close()
def _on_ctp_connected(mode: str) -> None:
try:
with _ctp_td_lock:
get_bridge().request_position_snapshot(force=True)
get_bridge().calibrate_trading_state()
_persist_snapshot(mode)
except Exception as exc:
logger.debug("worker ctp connected callback: %s", exc)
def _start_background_workers() -> None:
global _started_workers
if _started_workers:
return
_started_workers = True
set_position_refresh_callback(_on_position_refresh)
set_tick_quote_callback(_on_tick_quote)
set_tick_sl_tp_callback(_on_tick_sl_tp)
set_ctp_connected_callback(_on_ctp_connected)
from ctp_fee_worker import start_ctp_fee_worker
from ctp_premarket_connect import start_ctp_premarket_connect_worker
from ctp_reconnect import start_ctp_reconnect_worker
from order_pending import reconcile_pending_orders
from pending_order_worker import start_pending_order_worker
def _mode() -> str:
return get_trading_mode(get_setting)
start_ctp_reconnect_worker(get_mode_fn=_mode, get_setting_fn=get_setting)
start_ctp_premarket_connect_worker(get_mode_fn=_mode, get_setting_fn=get_setting)
start_ctp_fee_worker(
get_mode_fn=_mode,
get_setting_fn=get_setting,
set_setting_fn=set_setting,
)
start_pending_order_worker(
db_path=DB_PATH,
get_mode_fn=_mode,
init_tables_fn=_init_worker_tables,
get_capital_fn=_capital,
reconcile_fn=reconcile_pending_orders,
on_changed_fn=lambda: _persist_snapshot(_mode()),
)
start_sl_tp_guard_worker(
db_path=DB_PATH,
get_mode_fn=_mode,
init_tables_fn=_init_worker_tables,
get_capital_fn=_capital,
get_be_tick_buffer_fn=lambda: get_trailing_be_tick_buffer(get_setting),
notify_fn=_send_wechat_msg,
)
def _snapshot_loop() -> None:
time.sleep(3)
while True:
try:
mode = _mode()
if _fast_status(mode).get("connected"):
_persist_snapshot(mode)
except Exception as exc:
logger.debug("worker snapshot loop: %s", exc)
time.sleep(2 if is_trading_session() else 15)
threading.Thread(target=_snapshot_loop, daemon=True, name="ctp-worker-snapshot").start()
@app.route("/health")
def health():
mode = request.args.get("mode") or get_trading_mode(get_setting)
st = _fast_status(mode)
return _json_ok(
worker_online=True,
role=os.getenv("QIHUO_CTP_ROLE", "worker"),
mode=mode,
status=st,
ts=time.time(),
)
@app.route("/ctp/status")
def api_status():
mode = _mode_from_request()
return _json_ok(status=_fast_status(mode))
@app.route("/ctp/connect", methods=["POST"])
def api_connect():
data = request.get_json(silent=True) or {}
mode = data.get("mode") or get_trading_mode(get_setting)
info = ctp_start_connect(mode, force=bool(data.get("force")))
st = info.get("status") or _fast_status(mode)
return _json_ok(status=st, **{k: v for k, v in info.items() if k != "status"})
@app.route("/ctp/start_connect", methods=["POST"])
def api_start_connect():
data = request.get_json(silent=True) or {}
mode = data.get("mode") or get_trading_mode(get_setting)
return _json_ok(**ctp_start_connect(
mode,
force=bool(data.get("force")),
scheduled=bool(data.get("scheduled")),
))
@app.route("/ctp/disconnect", methods=["POST"])
def api_disconnect():
data = request.get_json(silent=True) or {}
ctp_disconnect(set_disabled_hint=bool(data.get("set_disabled_hint")))
return _json_ok(disconnected=True)
@app.route("/ctp/account")
def api_account():
mode = _mode_from_request()
if not _fast_status(mode).get("connected"):
return _json_ok(account={})
return _json_ok(account=ctp_get_account(mode))
@app.route("/ctp/positions", methods=["POST"])
def api_positions():
data = request.get_json(silent=True) or {}
mode = data.get("mode") or get_trading_mode(get_setting)
return _json_ok(positions=ctp_list_positions(
mode,
refresh_if_empty=bool(data.get("refresh_if_empty", True)),
refresh_margin=bool(data.get("refresh_margin", False)),
))
@app.route("/ctp/trades", methods=["POST"])
def api_trades():
data = request.get_json(silent=True) or {}
mode = data.get("mode") or get_trading_mode(get_setting)
return _json_ok(trades=ctp_list_trades(mode, refresh=bool(data.get("refresh"))))
@app.route("/ctp/active_orders")
def api_active_orders():
mode = _mode_from_request()
return _json_ok(orders=ctp_list_active_orders(mode))
@app.route("/ctp/tick_price", methods=["POST"])
def api_tick_price():
data = request.get_json(silent=True) or {}
return _json_ok(price=ctp_get_tick_price(
data.get("mode") or get_trading_mode(get_setting),
data.get("symbol") or "",
))
@app.route("/ctp/tick_detail", methods=["POST"])
def api_tick_detail():
data = request.get_json(silent=True) or {}
return _json_ok(detail=ctp_get_tick_detail(
data.get("mode") or get_trading_mode(get_setting),
data.get("symbol") or "",
))
@app.route("/ctp/estimate_margin_one_lot", methods=["POST"])
def api_estimate_margin():
data = request.get_json(silent=True) or {}
return _json_ok(margin=ctp_estimate_margin_one_lot(
data.get("mode") or get_trading_mode(get_setting),
data.get("symbol") or "",
float(data.get("price") or 0),
direction=data.get("direction") or "long",
))
@app.route("/ctp/contract_spec", methods=["POST"])
def api_contract_spec():
data = request.get_json(silent=True) or {}
return _json_ok(spec=ctp_lookup_contract_spec(
data.get("mode") or get_trading_mode(get_setting),
data.get("symbol") or "",
))
@app.route("/ctp/order", methods=["POST"])
def api_order():
data = request.get_json(silent=True) or {}
mode = data.get("mode") or get_trading_mode(get_setting)
result = execute_order(
None,
mode=mode,
offset=data.get("offset") or "open",
symbol=data.get("symbol") or "",
direction=data.get("direction") or "long",
lots=int(data.get("lots") or 1),
price=float(data.get("price") or 0),
settings=data.get("settings") or {},
order_type=data.get("order_type") or "limit",
)
_persist_snapshot(mode)
return _json_ok(**result)
@app.route("/ctp/cancel", methods=["POST"])
def api_cancel():
data = request.get_json(silent=True) or {}
mode = data.get("mode") or get_trading_mode(get_setting)
cancelled = ctp_cancel_order(mode, data.get("vt_orderid") or "")
_persist_snapshot(mode)
return _json_ok(cancelled=cancelled)
@app.route("/ctp/bridge/<action>", methods=["POST"])
def api_bridge_action(action: str):
data = request.get_json(silent=True) or {}
b = get_bridge()
if action == "calibrate_trading_state":
return _json_ok(result=b.calibrate_trading_state())
if action == "request_position_snapshot":
return _json_ok(result=b.request_position_snapshot(force=bool(data.get("force"))))
if action == "subscribe_symbol":
return _json_ok(result=b.subscribe_symbol(data.get("symbol") or ""))
if action == "refresh_positions":
return _json_ok(result=b.refresh_positions())
if action == "connect_in_progress":
return _json_ok(result=b.connect_in_progress())
if action == "reconnect_after_settings_saved":
mode = data.get("mode") or get_trading_mode(get_setting)
return _json_ok(result=b.reconnect_after_settings_saved(mode))
if action == "query_all_commissions":
return _json_ok(result=b.query_all_commissions(
mode=data.get("mode") or get_trading_mode(get_setting),
))
if action == "query_instrument_commission":
return _json_ok(result=b.query_instrument_commission(
data.get("symbol") or "",
mode=data.get("mode") or get_trading_mode(get_setting),
))
if action == "get_kline_bars_1m":
return _json_ok(result=b.get_kline_bars_1m(
data.get("symbol") or "",
mode=data.get("mode") or get_trading_mode(get_setting),
))
return _json_error(ValueError(f"unsupported bridge action: {action}"), status_code=404)
def main() -> None:
ensure_process_locale()
try_init_vnpy({})
_start_background_workers()
host = os.getenv("QIHUO_CTP_WORKER_HOST", "127.0.0.1")
port = int(os.getenv("QIHUO_CTP_WORKER_PORT", "6601") or 6601)
logger.info("starting qihuo-ctp worker on %s:%s", host, port)
app.run(host=host, port=port, debug=False, threaded=True, use_reloader=False)
if __name__ == "__main__":
main()