Fix CTP exchange routing for non-SHFE contracts and duplicate trade closes.

Resolve CZCE/DCE symbols to the correct exchange for orders, dedupe stop-loss closes and trade logs, and rely on CTP sync for authoritative records.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-26 14:06:49 +08:00
parent 382a9a0e14
commit 4ef33a367f
7 changed files with 191 additions and 45 deletions
+6 -2
View File
@@ -32,8 +32,12 @@ def ths_to_vnpy_symbol(ths_code: str) -> Tuple[str, str]:
"""
code = (ths_code or "").strip()
codes = ths_to_codes(code)
ex = (codes.get("ex") if codes else None) or "SHFE"
ex = _EX_MAP.get(ex, "SHFE")
ex = (codes.get("ex") if codes else None)
if not ex and codes:
mc = (codes.get("market_code") or "")
if "." in mc:
ex = mc.rsplit(".", 1)[-1]
ex = _EX_MAP.get(ex or "SHFE", "SHFE")
m = re.match(r"^([A-Za-z]+)(\d+)$", code)
if not m:
return code, ex
+4 -1
View File
@@ -16,7 +16,7 @@ from contract_specs import calc_position_metrics
from ctp_symbol import ths_to_vnpy_symbol
from fee_specs import calc_round_trip_fee
from symbols import ths_to_codes
from trade_log_lib import calc_equity_after, ensure_trade_log_columns
from trade_log_lib import calc_equity_after, purge_duplicate_local_trade_logs, ensure_trade_log_columns
from vnpy_bridge import ctp_list_trades, ctp_status
logger = logging.getLogger(__name__)
@@ -292,4 +292,7 @@ def sync_trade_logs_from_ctp(
refresh_stats_cache(conn, capital)
except Exception as exc:
logger.debug("stats refresh after ctp trade sync: %s", exc)
purged = purge_duplicate_local_trade_logs(conn)
if purged:
stats["purged"] = purged
return stats
+18 -16
View File
@@ -1639,23 +1639,25 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
lots=lots, price=price, settings=_settings_dict(),
order_type="market",
)
write_manual_close_trade_log(
conn,
mon,
symbol=sym,
direction=direction,
lots=lots,
close_price=price,
entry_price=entry or price,
trading_mode=mode,
capital=capital,
stop_loss=float(mon["stop_loss"]) if mon and mon.get("stop_loss") is not None else None,
take_profit=float(mon["take_profit"]) if mon and mon.get("take_profit") is not None else None,
open_time=(mon.get("open_time") or "") if mon else "",
symbol_name=(mon.get("symbol_name") or "") if mon else "",
market_code=(mon.get("market_code") or "") if mon else "",
)
if not ctp_status(mode).get("connected"):
write_manual_close_trade_log(
conn,
mon,
symbol=sym,
direction=direction,
lots=lots,
close_price=price,
entry_price=entry or price,
trading_mode=mode,
capital=capital,
stop_loss=float(mon["stop_loss"]) if mon and mon.get("stop_loss") is not None else None,
take_profit=float(mon["take_profit"]) if mon and mon.get("take_profit") is not None else None,
open_time=(mon.get("open_time") or "") if mon else "",
symbol_name=(mon.get("symbol_name") or "") if mon else "",
market_code=(mon.get("market_code") or "") if mon else "",
)
if mid:
_close_duplicate_monitors(conn, sym, direction, mid)
conn.execute(
"UPDATE trade_order_monitors SET status='closed' WHERE id=?",
(mid,),
+71 -15
View File
@@ -39,6 +39,7 @@ PLACE_COOLDOWN_SEC = 3
_last_close_attempt: dict[int, float] = {}
_closing_monitors: set[int] = set()
_closing_symbol_keys: set[str] = set()
_closing_lock = threading.Lock()
MONITOR_ORDER_COLUMNS = (
@@ -137,6 +138,69 @@ def _find_position(positions: list[dict], ths_code: str, direction: str) -> Opti
return None
def _position_key(sym: str, direction: str) -> str:
return f"{(sym or '').strip().lower()}|{(direction or 'long').strip().lower()}"
def _try_acquire_close_symbol(sym: str, direction: str) -> bool:
key = _position_key(sym, direction)
with _closing_lock:
if key in _closing_symbol_keys:
return False
_closing_symbol_keys.add(key)
return True
def _release_close_symbol(sym: str, direction: str) -> None:
key = _position_key(sym, direction)
with _closing_lock:
_closing_symbol_keys.discard(key)
def _close_all_monitors_for_symbol(conn, sym: str, direction: str) -> None:
direction = (direction or "long").strip().lower()
for r in conn.execute(
"SELECT id, symbol, direction FROM trade_order_monitors WHERE status='active'"
).fetchall():
if (r["direction"] or "long") != direction:
continue
if _match_symbol(sym, r["symbol"] or ""):
conn.execute(
"UPDATE trade_order_monitors SET status='closed' WHERE id=?",
(r["id"],),
)
def _dedupe_active_monitors(conn) -> None:
"""同一品种方向只保留一条 active 监控,避免重复触发平仓。"""
groups: dict[str, list[dict]] = {}
for r in conn.execute(
"SELECT * FROM trade_order_monitors WHERE status='active' ORDER BY id ASC"
).fetchall():
row = dict(r)
key = _position_key(row.get("symbol") or "", row.get("direction") or "long")
groups.setdefault(key, []).append(row)
for items in groups.values():
if len(items) <= 1:
continue
def _keep_score(m: dict) -> tuple:
mt = (m.get("monitor_type") or "").lower()
score = 0
if mt != "ctp_sync":
score += 10
if m.get("stop_loss") is not None:
score += 5
return (score, int(m.get("id") or 0))
items.sort(key=_keep_score, reverse=True)
for dup in items[1:]:
conn.execute(
"UPDATE trade_order_monitors SET status='closed' WHERE id=?",
(dup["id"],),
)
def _can_close_now(monitor_id: int, *, cooldown: int = PLACE_COOLDOWN_SEC) -> bool:
last = _last_close_attempt.get(monitor_id, 0.0)
return (time.time() - last) >= cooldown
@@ -575,6 +639,7 @@ def _execute_local_close(
positions = ctp_list_positions(mode)
pos = _find_position(positions, sym, direction)
if not pos:
_close_all_monitors_for_symbol(conn, sym, direction)
reconcile_monitors_without_position(conn, mode)
return
lots = int(pos.get("lots") or mon.get("lots") or 1)
@@ -590,24 +655,16 @@ def _execute_local_close(
price=mark,
order_type="market",
)
_write_trade_log(
conn,
mon,
close_price=mark,
reason=reason,
trading_mode=mode,
capital=capital,
)
conn.execute("UPDATE trade_order_monitors SET status='closed' WHERE id=?", (mon["id"],))
_close_all_monitors_for_symbol(conn, sym, direction)
conn.commit()
result_label = _result_for_close(mon, reason)
logger.info(
"止盈止损本地触发 monitor=%s result=%s %s %s %d手 @%s",
"止盈止损本地触发 monitor=%s result=%s %s %s %d手 @%s(待 CTP 成交同步写入交易记录)",
mon.get("id"), result_label, sym, direction, lots, mark,
)
if notify_fn:
try:
notify_fn(f"{result_label} {sym} {direction} {lots}手 @{mark}已记入交易记录")
notify_fn(f"{result_label} {sym} {direction} {lots}手 @{mark}平仓委托已提交")
except Exception as exc:
logger.debug("SL/TP notify failed: %s", exc)
@@ -627,12 +684,12 @@ def check_monitors_locally(
if not is_trading_session():
return 0
reconcile_monitors_without_position(conn, mode)
_dedupe_active_monitors(conn)
conn.commit()
closed = 0
rows = [dict(r) for r in conn.execute(
"SELECT * FROM trade_order_monitors WHERE status='active'"
).fetchall()]
conn.commit()
for mon in rows:
mid = int(mon.get("id") or 0)
sym = (mon.get("symbol") or "").strip()
@@ -677,7 +734,7 @@ def check_monitors_locally(
continue
if mid > 0 and not _can_close_now(mid):
continue
if mid > 0 and not _try_acquire_close(mid):
if not _try_acquire_close_symbol(sym, direction):
continue
try:
_execute_local_close(
@@ -695,8 +752,7 @@ def check_monitors_locally(
except Exception as exc:
logger.warning("SL/TP local close failed monitor=%s: %s", mid, exc)
finally:
if mid > 0:
_release_close(mid)
_release_close_symbol(sym, direction)
return closed
+28 -10
View File
@@ -140,6 +140,17 @@ def _find_product_by_letters(letters: str) -> Optional[dict]:
return None
def _product_codes(product: dict, ths_code: str, market_code: str, sina_code: str) -> dict:
return {
"ths_code": ths_code,
"market_code": market_code,
"sina_code": sina_code,
"ex": product["ex"],
"name": product["name"],
"exchange": product["exchange"],
}
def ths_to_codes(ths_code: str) -> Optional[dict]:
"""同花顺合约代码 -> ths_full + sina 回退代码。"""
code = ths_code.strip()
@@ -155,11 +166,13 @@ def ths_to_codes(ths_code: str) -> Optional[dict]:
return None
product = _find_product_by_letters(letters)
if product:
return {
"ths_code": build_ths_code(product, year, month),
"market_code": build_ths_full_code(product, year, month),
"sina_code": build_sina_code(product, year, month),
}
ths = build_ths_code(product, year, month)
return _product_codes(
product,
ths,
build_ths_full_code(product, year, month),
build_sina_code(product, year, month),
)
letters_up = letters.upper()
if letters_up in ("IF", "IH", "IC", "IM", "T", "TF", "TS"):
ths = f"{letters_up}{digits}"
@@ -167,6 +180,9 @@ def ths_to_codes(ths_code: str) -> Optional[dict]:
"ths_code": ths,
"market_code": f"{ths}.CFFEX",
"sina_code": f"CFF_RE_{letters_up}{digits}",
"ex": "CFFEX",
"name": letters_up,
"exchange": "中金所",
}
m3 = re.match(r"^([A-Za-z]+)(\d{3})$", code)
@@ -183,11 +199,13 @@ def ths_to_codes(ths_code: str) -> Optional[dict]:
candidate += 10
product = _find_product_by_letters(letters)
if product:
return {
"ths_code": build_ths_code(product, candidate, month),
"market_code": build_ths_full_code(product, candidate, month),
"sina_code": build_sina_code(product, candidate, month),
}
ths = build_ths_code(product, candidate, month)
return _product_codes(
product,
ths,
build_ths_full_code(product, candidate, month),
build_sina_code(product, candidate, month),
)
return None
+45
View File
@@ -32,6 +32,51 @@ def calc_equity_after(capital: float, pnl_net: float) -> float | None:
return round(cap + float(pnl_net or 0), 2)
def _norm_symbol(symbol: str) -> str:
return (symbol or "").split(".")[0].strip().lower()
def _norm_close_minute(ts: str) -> str:
"""统一 close_time 到分钟粒度,兼容 ISO `T` 与空格分隔。"""
return (ts or "").strip().replace("T", " ")[:16]
def purge_duplicate_local_trade_logs(conn) -> int:
"""删除已被 CTP 柜台记录覆盖的本地重复成交。"""
removed = 0
ctp_rows = [
dict(r)
for r in conn.execute("SELECT * FROM trade_logs WHERE source='ctp'").fetchall()
]
local_rows = [
dict(r)
for r in conn.execute(
"""SELECT * FROM trade_logs
WHERE COALESCE(source, 'local') != 'ctp'
AND (ctp_trade_key IS NULL OR ctp_trade_key = '')"""
).fetchall()
]
for ctp in ctp_rows:
ct16 = _norm_close_minute(ctp.get("close_time") or "")
sym_n = _norm_symbol(ctp.get("symbol") or "")
lots = float(ctp.get("lots") or 0)
direction = (ctp.get("direction") or "long").strip().lower()
for loc in local_rows:
if loc.get("id") == ctp.get("id"):
continue
if _norm_symbol(loc.get("symbol") or "") != sym_n:
continue
if (loc.get("direction") or "long").strip().lower() != direction:
continue
if _norm_close_minute(loc.get("close_time") or "") != ct16:
continue
if abs(float(loc.get("lots") or 0) - lots) > 0.01:
continue
conn.execute("DELETE FROM trade_logs WHERE id=?", (loc["id"],))
removed += 1
return removed
def enrich_trades_for_records(
trades: list[dict[str, Any]],
*,
+19 -1
View File
@@ -556,7 +556,21 @@ class CtpBridge:
return Offset.CLOSE
pos = self._find_position(sym, ex_u, hold_direction)
if not pos:
# 找不到持仓明细时,日盘新开仓优先平今(避免 SHFE「平昨仓位不足」)
for p in self._collect_positions():
ps = (p.get("symbol") or "").lower()
if ps != sym.lower():
continue
if (p.get("direction") or "long") != hold_direction:
continue
td = int(p.get("td_volume") or 0)
yd = int(p.get("yd_volume") or 0)
if td >= lots:
return Offset.CLOSETODAY
if yd >= lots:
return Offset.CLOSEYESTERDAY
if td + yd >= lots:
return Offset.CLOSETODAY
break
if ex_u in ("SHFE", "INE", "CZCE"):
return Offset.CLOSETODAY
return Offset.CLOSE
@@ -1112,6 +1126,8 @@ class CtpBridge:
sym, ex_name, d, vol, price, pos=pos,
)
open_time = self._lookup_position_open_time(sym, d) or None
yd = int(getattr(pos, "yd_volume", 0) or 0)
td = max(0, vol - yd)
out.append({
"symbol": sym,
"exchange": ex_name,
@@ -1122,6 +1138,8 @@ class CtpBridge:
"frozen": int(getattr(pos, "frozen", 0) or 0),
"margin": margin,
"open_time": open_time,
"yd_volume": yd,
"td_volume": td,
})
return out