3a150dd3d6
Add in-memory order/position books fed by CTP events, split active orders above positions in the UI, tick-triggered local SL/TP, and 30-second full calibration. Co-authored-by: Cursor <cursoragent@cursor.com>
202 lines
6.4 KiB
Python
202 lines
6.4 KiB
Python
# Copyright (c) 2025-2026 马建军. All rights reserved.
|
|
# 专有软件 — 未经授权禁止复制、传播、转售。
|
|
# 详见 LICENSE.zh-CN.txt
|
|
|
|
"""CTP 权威内存簿:委托、持仓、同步状态(事件增量 + 定期全量校准)。"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import threading
|
|
import time
|
|
from typing import Any, Callable, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
CALIBRATE_INTERVAL_SEC = 30.0
|
|
|
|
|
|
def position_key(exchange: str, symbol: str, direction: str) -> str:
|
|
"""统一持仓键:exchange|symbol|direction"""
|
|
ex = (exchange or "").strip().upper()
|
|
sym = (symbol or "").strip().lower()
|
|
d = (direction or "long").strip().lower()
|
|
if ex:
|
|
return f"{ex}|{sym}|{d}"
|
|
return f"{sym}|{d}"
|
|
|
|
|
|
def parse_position_key(key: str) -> tuple[str, str, str]:
|
|
parts = (key or "").split("|")
|
|
if len(parts) >= 3:
|
|
return parts[0], parts[1], parts[2]
|
|
if len(parts) == 2:
|
|
return "", parts[0], parts[1]
|
|
return "", (key or "").lower(), "long"
|
|
|
|
|
|
class CtpTradingState:
|
|
"""进程内 CTP 快照:柜台回报为准,SQLite 仅挂 SL/TP 元数据。"""
|
|
|
|
def __init__(self) -> None:
|
|
self._lock = threading.RLock()
|
|
self._orders: dict[str, dict[str, Any]] = {}
|
|
self._positions: dict[str, dict[str, Any]] = {}
|
|
self._tick_prices: dict[str, float] = {}
|
|
self._sync_state = "idle"
|
|
self._last_event_ts: float = 0.0
|
|
self._last_calibrate_ts: float = 0.0
|
|
self._on_change: Optional[Callable[[], None]] = None
|
|
|
|
def set_change_callback(self, fn: Optional[Callable[[], None]]) -> None:
|
|
self._on_change = fn
|
|
|
|
def _notify(self) -> None:
|
|
self._last_event_ts = time.time()
|
|
fn = self._on_change
|
|
if fn:
|
|
try:
|
|
fn()
|
|
except Exception as exc:
|
|
logger.debug("trading state change callback: %s", exc)
|
|
|
|
@property
|
|
def sync_state(self) -> str:
|
|
with self._lock:
|
|
return self._sync_state
|
|
|
|
def sync_label(self) -> str:
|
|
st = self.sync_state
|
|
if st == "syncing":
|
|
return "同步中…"
|
|
if st == "ready":
|
|
return "已同步"
|
|
return ""
|
|
|
|
def begin_sync(self) -> None:
|
|
with self._lock:
|
|
self._sync_state = "syncing"
|
|
|
|
def finish_sync(self) -> None:
|
|
with self._lock:
|
|
self._sync_state = "ready"
|
|
self._last_calibrate_ts = time.time()
|
|
|
|
def needs_calibrate(self) -> bool:
|
|
with self._lock:
|
|
if self._sync_state == "idle":
|
|
return True
|
|
return (time.time() - self._last_calibrate_ts) >= CALIBRATE_INTERVAL_SEC
|
|
|
|
def upsert_order(self, row: dict[str, Any], *, notify: bool = True) -> None:
|
|
oid = str(row.get("order_id") or row.get("vt_order_id") or "").strip()
|
|
if not oid:
|
|
return
|
|
with self._lock:
|
|
self._orders[oid] = dict(row)
|
|
if notify:
|
|
self._notify()
|
|
|
|
def remove_order(self, order_id: str, *, notify: bool = True) -> None:
|
|
oid = (order_id or "").strip()
|
|
if not oid:
|
|
return
|
|
removed = False
|
|
with self._lock:
|
|
if oid in self._orders:
|
|
del self._orders[oid]
|
|
removed = True
|
|
else:
|
|
for k in list(self._orders.keys()):
|
|
if k == oid or k.endswith(oid) or oid.endswith(k):
|
|
del self._orders[k]
|
|
removed = True
|
|
break
|
|
if removed and notify:
|
|
self._notify()
|
|
|
|
def upsert_position(self, row: dict[str, Any], *, notify: bool = True) -> None:
|
|
lots = int(row.get("lots") or 0)
|
|
ex = row.get("exchange") or ""
|
|
sym = row.get("symbol") or ""
|
|
direction = row.get("direction") or "long"
|
|
pk = position_key(ex, sym, direction)
|
|
with self._lock:
|
|
if lots <= 0:
|
|
self._positions.pop(pk, None)
|
|
else:
|
|
row = dict(row)
|
|
row["position_key"] = pk
|
|
self._positions[pk] = row
|
|
if notify:
|
|
self._notify()
|
|
|
|
def remove_position(self, pk: str, *, notify: bool = True) -> None:
|
|
with self._lock:
|
|
self._positions.pop(pk, None)
|
|
if notify:
|
|
self._notify()
|
|
|
|
def set_tick_price(self, exchange: str, symbol: str, price: float) -> None:
|
|
if not symbol or price <= 0:
|
|
return
|
|
key = f"{(exchange or '').upper()}|{symbol.lower()}"
|
|
with self._lock:
|
|
self._tick_prices[key] = float(price)
|
|
|
|
def get_tick_price(self, exchange: str, symbol: str) -> Optional[float]:
|
|
key = f"{(exchange or '').upper()}|{symbol.lower()}"
|
|
with self._lock:
|
|
return self._tick_prices.get(key)
|
|
|
|
def get_active_orders(self) -> list[dict[str, Any]]:
|
|
with self._lock:
|
|
return list(self._orders.values())
|
|
|
|
def get_positions(self) -> list[dict[str, Any]]:
|
|
with self._lock:
|
|
return list(self._positions.values())
|
|
|
|
def position_keys(self) -> set[str]:
|
|
with self._lock:
|
|
return set(self._positions.keys())
|
|
|
|
def clear(self) -> None:
|
|
with self._lock:
|
|
self._orders.clear()
|
|
self._positions.clear()
|
|
self._tick_prices.clear()
|
|
self._sync_state = "idle"
|
|
|
|
def calibrate_from_lists(
|
|
self,
|
|
orders: list[dict[str, Any]],
|
|
positions: list[dict[str, Any]],
|
|
) -> None:
|
|
"""全量校准:以 vnpy 内存为准重建订单/持仓簿。"""
|
|
self.begin_sync()
|
|
new_orders: dict[str, dict[str, Any]] = {}
|
|
for o in orders or []:
|
|
oid = str(o.get("order_id") or o.get("vt_order_id") or "").strip()
|
|
if oid:
|
|
new_orders[oid] = dict(o)
|
|
new_positions: dict[str, dict[str, Any]] = {}
|
|
for p in positions or []:
|
|
lots = int(p.get("lots") or 0)
|
|
if lots <= 0:
|
|
continue
|
|
ex = p.get("exchange") or ""
|
|
sym = p.get("symbol") or ""
|
|
direction = p.get("direction") or "long"
|
|
pk = position_key(ex, sym, direction)
|
|
row = dict(p)
|
|
row["position_key"] = pk
|
|
new_positions[pk] = row
|
|
with self._lock:
|
|
self._orders = new_orders
|
|
self._positions = new_positions
|
|
self.finish_sync()
|
|
self._notify()
|
|
|
|
|
|
trading_state = CtpTradingState()
|