d3955309d9
Co-authored-by: Cursor <cursoragent@cursor.com>
296 lines
10 KiB
Python
296 lines
10 KiB
Python
# Copyright (c) 2025-2026 马建军. All rights reserved.
|
|
# 专有软件 — 未经授权禁止复制、传播、转售。
|
|
# 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。
|
|
# 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md
|
|
|
|
"""从 CTP 柜台同步成交,写入 trade_logs(以交易所成交为准)。"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
from typing import Any, Callable, Optional
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from contract_specs import calc_position_metrics
|
|
from ctp_symbol import ths_to_vnpy_symbol
|
|
from fee_specs import calc_round_trip_fee
|
|
from symbols import ths_to_codes
|
|
from trade_log_lib import calc_equity_after, ensure_trade_log_columns
|
|
from vnpy_bridge import ctp_list_trades, ctp_status
|
|
|
|
logger = logging.getLogger(__name__)
|
|
TZ = ZoneInfo("Asia/Shanghai")
|
|
|
|
|
|
def _match_symbol(ctp_sym: str, ths: str) -> bool:
|
|
a = (ctp_sym or "").lower()
|
|
b = (ths or "").lower()
|
|
if a == b:
|
|
return True
|
|
if a and b and a.split(".")[0] == b.split(".")[0]:
|
|
return True
|
|
try:
|
|
vnpy_sym, _ = ths_to_vnpy_symbol(ths)
|
|
if a == vnpy_sym.lower():
|
|
return True
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
|
|
def _to_ths_code(symbol: str) -> str:
|
|
sym = (symbol or "").strip()
|
|
if not sym:
|
|
return ""
|
|
codes = ths_to_codes(sym)
|
|
if codes:
|
|
return codes.get("ths_code") or sym
|
|
return sym.lower()
|
|
|
|
|
|
def _allocate_commission(total_comm: float, matched: int, total_lots: int) -> float:
|
|
if total_comm <= 0 or matched <= 0 or total_lots <= 0:
|
|
return 0.0
|
|
return round(total_comm * matched / total_lots, 2)
|
|
|
|
|
|
def build_round_trips(trades: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
"""按 FIFO 将开/平仓成交配对为完整回合。"""
|
|
stacks: dict[tuple[str, str], list[dict[str, Any]]] = defaultdict(list)
|
|
trips: list[dict[str, Any]] = []
|
|
|
|
ordered = sorted(
|
|
trades,
|
|
key=lambda t: ((t.get("datetime") or ""), str(t.get("trade_id") or "")),
|
|
)
|
|
for t in ordered:
|
|
sym = (t.get("symbol") or "").lower()
|
|
pos_dir = (t.get("position_direction") or "long").strip().lower()
|
|
offset = (t.get("offset") or "open").strip().lower()
|
|
lots = int(t.get("lots") or 0)
|
|
if not sym or lots <= 0:
|
|
continue
|
|
key = (sym, pos_dir)
|
|
if offset == "open":
|
|
stacks[key].append({
|
|
**t,
|
|
"remaining": lots,
|
|
"commission_remaining": float(t.get("commission") or 0),
|
|
})
|
|
continue
|
|
|
|
close_lots_total = lots
|
|
close_lots_left = lots
|
|
close_price = float(t.get("price") or 0)
|
|
close_time = t.get("datetime") or ""
|
|
close_trade_id = str(t.get("trade_id") or "")
|
|
close_comm_total = float(t.get("commission") or 0)
|
|
while close_lots_left > 0 and stacks[key]:
|
|
open_t = stacks[key][0]
|
|
open_rem = int(open_t.get("remaining") or 0)
|
|
matched = min(close_lots_left, open_rem)
|
|
if matched <= 0:
|
|
stacks[key].pop(0)
|
|
continue
|
|
open_comm_rem = float(open_t.get("commission_remaining") or 0)
|
|
open_comm_share = (
|
|
_allocate_commission(open_comm_rem, matched, open_rem)
|
|
if open_rem > 0 else 0.0
|
|
)
|
|
close_comm_share = _allocate_commission(
|
|
close_comm_total, matched, close_lots_total,
|
|
)
|
|
open_t["remaining"] = open_rem - matched
|
|
open_t["commission_remaining"] = round(
|
|
max(0.0, open_comm_rem - open_comm_share), 2,
|
|
)
|
|
if open_t["remaining"] <= 0:
|
|
stacks[key].pop(0)
|
|
close_lots_left -= matched
|
|
open_trade_id = str(open_t.get("trade_id") or "")
|
|
ctp_key = f"{open_trade_id}|{close_trade_id}|{sym}|{pos_dir}|{matched}"
|
|
trip_fee = round(open_comm_share + close_comm_share, 2)
|
|
trips.append({
|
|
"ctp_trade_key": ctp_key,
|
|
"symbol": sym,
|
|
"ths_code": _to_ths_code(sym),
|
|
"direction": pos_dir,
|
|
"lots": matched,
|
|
"entry_price": float(open_t.get("price") or 0),
|
|
"close_price": close_price,
|
|
"open_time": open_t.get("datetime") or "",
|
|
"close_time": close_time,
|
|
"open_trade_id": open_trade_id,
|
|
"close_trade_id": close_trade_id,
|
|
"fee": trip_fee,
|
|
"fee_from_ctp": trip_fee > 0,
|
|
})
|
|
return trips
|
|
|
|
|
|
def _find_monitor_meta(
|
|
conn,
|
|
*,
|
|
symbol: str,
|
|
direction: str,
|
|
open_time: str,
|
|
match_symbol_fn: Callable[[str, str], bool] | None = None,
|
|
) -> dict[str, Any]:
|
|
match = match_symbol_fn or _match_symbol
|
|
direction = (direction or "long").strip().lower()
|
|
best: Optional[dict[str, Any]] = None
|
|
for r in conn.execute(
|
|
"SELECT * FROM trade_order_monitors ORDER BY id DESC LIMIT 200"
|
|
).fetchall():
|
|
row = dict(r)
|
|
if (row.get("direction") or "long").strip().lower() != direction:
|
|
continue
|
|
if not match(symbol, row.get("symbol") or ""):
|
|
continue
|
|
if best is None:
|
|
best = row
|
|
continue
|
|
ot = (row.get("open_time") or "").strip()
|
|
if open_time and ot and abs(len(ot) - len(open_time)) <= 2 and ot[:16] == open_time[:16]:
|
|
return row
|
|
return best or {}
|
|
|
|
|
|
def _holding_minutes(open_time: str, close_time: str) -> int:
|
|
try:
|
|
from app import holding_to_minutes
|
|
return int(holding_to_minutes(open_time, close_time) or 0)
|
|
except Exception:
|
|
return 0
|
|
|
|
|
|
def sync_trade_logs_from_ctp(
|
|
conn,
|
|
mode: str,
|
|
*,
|
|
capital: float = 0.0,
|
|
trading_mode: str = "simulation",
|
|
) -> dict[str, Any]:
|
|
"""查询 CTP 成交并 upsert 到 trade_logs。返回同步摘要。"""
|
|
stats = {"synced": 0, "updated": 0, "skipped": 0, "connected": False}
|
|
if not ctp_status(mode).get("connected"):
|
|
return stats
|
|
stats["connected"] = True
|
|
ensure_trade_log_columns(conn)
|
|
try:
|
|
conn.execute("ALTER TABLE trade_logs ADD COLUMN source TEXT DEFAULT 'local'")
|
|
except Exception:
|
|
pass
|
|
try:
|
|
conn.execute("ALTER TABLE trade_logs ADD COLUMN ctp_trade_key TEXT")
|
|
except Exception:
|
|
pass
|
|
|
|
trades = ctp_list_trades(mode, refresh=True)
|
|
trips = build_round_trips(trades)
|
|
for trip in trips:
|
|
key = trip.get("ctp_trade_key") or ""
|
|
if not key:
|
|
stats["skipped"] += 1
|
|
continue
|
|
existing = conn.execute(
|
|
"SELECT id FROM trade_logs WHERE ctp_trade_key=?",
|
|
(key,),
|
|
).fetchone()
|
|
|
|
ths = trip.get("ths_code") or trip.get("symbol") or ""
|
|
codes = ths_to_codes(ths) or {}
|
|
direction = trip.get("direction") or "long"
|
|
entry = float(trip.get("entry_price") or 0)
|
|
close_px = float(trip.get("close_price") or 0)
|
|
lots = float(trip.get("lots") or 0)
|
|
open_time = trip.get("open_time") or ""
|
|
close_time = trip.get("close_time") or datetime.now(TZ).strftime("%Y-%m-%dT%H:%M")
|
|
|
|
mon = _find_monitor_meta(
|
|
conn,
|
|
symbol=trip.get("symbol") or ths,
|
|
direction=direction,
|
|
open_time=open_time,
|
|
)
|
|
sl = mon.get("stop_loss")
|
|
tp = mon.get("take_profit")
|
|
try:
|
|
sl_f = float(sl) if sl is not None else entry
|
|
tp_f = float(tp) if tp is not None else entry
|
|
except (TypeError, ValueError):
|
|
sl_f, tp_f = entry, entry
|
|
|
|
metrics = calc_position_metrics(
|
|
direction, entry, sl_f, tp_f, lots, close_px, capital, ths,
|
|
)
|
|
pnl = float(metrics.get("float_pnl") or 0)
|
|
trip_fee = float(trip.get("fee") or 0)
|
|
if trip_fee > 0:
|
|
fee = round(trip_fee, 2)
|
|
else:
|
|
fee = calc_round_trip_fee(
|
|
ths, entry, close_px, lots, open_time, close_time, trading_mode=trading_mode,
|
|
)
|
|
pnl_net = round(pnl - fee, 2)
|
|
margin_pct = metrics.get("position_pct")
|
|
equity_after = calc_equity_after(capital, pnl_net)
|
|
minutes = _holding_minutes(open_time, close_time)
|
|
result = "CTP同步"
|
|
monitor_type = mon.get("monitor_type") or "CTP同步"
|
|
|
|
row_vals = (
|
|
ths,
|
|
codes.get("name") or mon.get("symbol_name") or ths,
|
|
codes.get("market_code") or mon.get("market_code") or "",
|
|
codes.get("sina_code") or mon.get("sina_code") or "",
|
|
monitor_type,
|
|
direction,
|
|
entry,
|
|
sl if sl is not None else None,
|
|
tp if tp is not None else None,
|
|
close_px,
|
|
lots,
|
|
metrics.get("margin"),
|
|
margin_pct,
|
|
minutes,
|
|
open_time,
|
|
close_time,
|
|
pnl,
|
|
fee,
|
|
pnl_net,
|
|
equity_after,
|
|
result,
|
|
)
|
|
if existing:
|
|
conn.execute(
|
|
"""UPDATE trade_logs SET
|
|
symbol=?, symbol_name=?, market_code=?, sina_code=?, monitor_type=?,
|
|
direction=?, entry_price=?, stop_loss=?, take_profit=?, close_price=?,
|
|
lots=?, margin=?, margin_pct=?, holding_minutes=?, open_time=?, close_time=?,
|
|
pnl=?, fee=?, pnl_net=?, equity_after=?, result=?, source='ctp', verified=1
|
|
WHERE ctp_trade_key=?""",
|
|
row_vals + (key,),
|
|
)
|
|
stats["updated"] += 1
|
|
else:
|
|
conn.execute(
|
|
"""INSERT INTO trade_logs
|
|
(symbol, symbol_name, market_code, sina_code, monitor_type, direction,
|
|
entry_price, stop_loss, take_profit, close_price, lots, margin,
|
|
margin_pct, holding_minutes, open_time, close_time, pnl, fee, pnl_net,
|
|
equity_after, result, source, ctp_trade_key, verified)
|
|
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
|
|
row_vals + ("ctp", key, 1),
|
|
)
|
|
stats["synced"] += 1
|
|
|
|
if stats["synced"] or stats["updated"]:
|
|
try:
|
|
from stats_engine import refresh_stats_cache
|
|
refresh_stats_cache(conn, capital)
|
|
except Exception as exc:
|
|
logger.debug("stats refresh after ctp trade sync: %s", exc)
|
|
return stats
|