# Copyright (c) 2025-2026 马建军. All rights reserved. # 专有软件 — 未经授权禁止复制、传播、转售。 # 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。 # 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md """从 CTP 柜台同步成交,写入 trade_logs(以交易所成交为准)。""" from __future__ import annotations import logging from collections import defaultdict from datetime import datetime from typing import Any, Callable, Optional from zoneinfo import ZoneInfo from contract_specs import calc_position_metrics from ctp_symbol import ths_to_vnpy_symbol from fee_specs import calc_round_trip_fee from symbols import ths_to_codes from trade_log_lib import calc_equity_after, purge_duplicate_local_trade_logs, ensure_trade_log_columns from vnpy_bridge import ctp_list_trades, ctp_status logger = logging.getLogger(__name__) TZ = ZoneInfo("Asia/Shanghai") def _match_symbol(ctp_sym: str, ths: str) -> bool: a = (ctp_sym or "").lower() b = (ths or "").lower() if a == b: return True if a and b and a.split(".")[0] == b.split(".")[0]: return True try: vnpy_sym, _ = ths_to_vnpy_symbol(ths) if a == vnpy_sym.lower(): return True except Exception: pass return False def _to_ths_code(symbol: str) -> str: sym = (symbol or "").strip() if not sym: return "" codes = ths_to_codes(sym) if codes: return codes.get("ths_code") or sym return sym.lower() def _allocate_commission(total_comm: float, matched: int, total_lots: int) -> float: if total_comm <= 0 or matched <= 0 or total_lots <= 0: return 0.0 return round(total_comm * matched / total_lots, 2) def build_round_trips(trades: list[dict[str, Any]]) -> list[dict[str, Any]]: """按 FIFO 将开/平仓成交配对为完整回合。""" stacks: dict[tuple[str, str], list[dict[str, Any]]] = defaultdict(list) trips: list[dict[str, Any]] = [] ordered = sorted( trades, key=lambda t: ((t.get("datetime") or ""), str(t.get("trade_id") or "")), ) for t in ordered: sym = (t.get("symbol") or "").lower() pos_dir = (t.get("position_direction") or "long").strip().lower() offset = (t.get("offset") or "open").strip().lower() lots = int(t.get("lots") or 0) if not sym or lots <= 0: continue key = (sym, pos_dir) if offset == "open": stacks[key].append({ **t, "remaining": lots, "commission_remaining": float(t.get("commission") or 0), }) continue close_lots_total = lots close_lots_left = lots close_price = float(t.get("price") or 0) close_time = t.get("datetime") or "" close_trade_id = str(t.get("trade_id") or "") close_comm_total = float(t.get("commission") or 0) while close_lots_left > 0 and stacks[key]: open_t = stacks[key][0] open_rem = int(open_t.get("remaining") or 0) matched = min(close_lots_left, open_rem) if matched <= 0: stacks[key].pop(0) continue open_comm_rem = float(open_t.get("commission_remaining") or 0) open_comm_share = ( _allocate_commission(open_comm_rem, matched, open_rem) if open_rem > 0 else 0.0 ) close_comm_share = _allocate_commission( close_comm_total, matched, close_lots_total, ) open_t["remaining"] = open_rem - matched open_t["commission_remaining"] = round( max(0.0, open_comm_rem - open_comm_share), 2, ) if open_t["remaining"] <= 0: stacks[key].pop(0) close_lots_left -= matched open_trade_id = str(open_t.get("trade_id") or "") ctp_key = f"{open_trade_id}|{close_trade_id}|{sym}|{pos_dir}|{matched}" trip_fee = round(open_comm_share + close_comm_share, 2) trips.append({ "ctp_trade_key": ctp_key, "symbol": sym, "ths_code": _to_ths_code(sym), "direction": pos_dir, "lots": matched, "entry_price": float(open_t.get("price") or 0), "close_price": close_price, "open_time": open_t.get("datetime") or "", "close_time": close_time, "open_trade_id": open_trade_id, "close_trade_id": close_trade_id, "fee": trip_fee, "fee_from_ctp": trip_fee > 0, }) return trips def _find_monitor_meta( conn, *, symbol: str, direction: str, open_time: str, match_symbol_fn: Callable[[str, str], bool] | None = None, ) -> dict[str, Any]: match = match_symbol_fn or _match_symbol direction = (direction or "long").strip().lower() best: Optional[dict[str, Any]] = None for r in conn.execute( "SELECT * FROM trade_order_monitors ORDER BY id DESC LIMIT 200" ).fetchall(): row = dict(r) if (row.get("direction") or "long").strip().lower() != direction: continue if not match(symbol, row.get("symbol") or ""): continue if best is None: best = row continue ot = (row.get("open_time") or "").strip() if open_time and ot and abs(len(ot) - len(open_time)) <= 2 and ot[:16] == open_time[:16]: return row return best or {} def _holding_minutes(open_time: str, close_time: str) -> int: try: from app import holding_to_minutes return int(holding_to_minutes(open_time, close_time) or 0) except Exception: return 0 def sync_trade_logs_from_ctp( conn, mode: str, *, capital: float = 0.0, trading_mode: str = "simulation", ) -> dict[str, Any]: """查询 CTP 成交并 upsert 到 trade_logs。返回同步摘要。""" stats = {"synced": 0, "updated": 0, "skipped": 0, "connected": False} if not ctp_status(mode).get("connected"): return stats stats["connected"] = True ensure_trade_log_columns(conn) try: conn.execute("ALTER TABLE trade_logs ADD COLUMN source TEXT DEFAULT 'local'") except Exception: pass try: conn.execute("ALTER TABLE trade_logs ADD COLUMN ctp_trade_key TEXT") except Exception: pass trades = ctp_list_trades(mode, refresh=True) trips = build_round_trips(trades) for trip in trips: key = trip.get("ctp_trade_key") or "" if not key: stats["skipped"] += 1 continue existing = conn.execute( "SELECT id FROM trade_logs WHERE ctp_trade_key=?", (key,), ).fetchone() ths = trip.get("ths_code") or trip.get("symbol") or "" codes = ths_to_codes(ths) or {} direction = trip.get("direction") or "long" entry = float(trip.get("entry_price") or 0) close_px = float(trip.get("close_price") or 0) lots = float(trip.get("lots") or 0) open_time = trip.get("open_time") or "" close_time = trip.get("close_time") or datetime.now(TZ).strftime("%Y-%m-%dT%H:%M") mon = _find_monitor_meta( conn, symbol=trip.get("symbol") or ths, direction=direction, open_time=open_time, ) sl = mon.get("stop_loss") tp = mon.get("take_profit") try: sl_f = float(sl) if sl is not None else entry tp_f = float(tp) if tp is not None else entry except (TypeError, ValueError): sl_f, tp_f = entry, entry metrics = calc_position_metrics( direction, entry, sl_f, tp_f, lots, close_px, capital, ths, ) pnl = float(metrics.get("float_pnl") or 0) trip_fee = float(trip.get("fee") or 0) if trip_fee > 0: fee = round(trip_fee, 2) else: fee = calc_round_trip_fee( ths, entry, close_px, lots, open_time, close_time, trading_mode=trading_mode, ) pnl_net = round(pnl - fee, 2) margin_pct = metrics.get("position_pct") equity_after = calc_equity_after(capital, pnl_net) minutes = _holding_minutes(open_time, close_time) result = "CTP同步" monitor_type = mon.get("monitor_type") or "CTP同步" row_vals = ( ths, codes.get("name") or mon.get("symbol_name") or ths, codes.get("market_code") or mon.get("market_code") or "", codes.get("sina_code") or mon.get("sina_code") or "", monitor_type, direction, entry, sl if sl is not None else None, tp if tp is not None else None, close_px, lots, metrics.get("margin"), margin_pct, minutes, open_time, close_time, pnl, fee, pnl_net, equity_after, result, ) if existing: conn.execute( """UPDATE trade_logs SET symbol=?, symbol_name=?, market_code=?, sina_code=?, monitor_type=?, direction=?, entry_price=?, stop_loss=?, take_profit=?, close_price=?, lots=?, margin=?, margin_pct=?, holding_minutes=?, open_time=?, close_time=?, pnl=?, fee=?, pnl_net=?, equity_after=?, result=?, source='ctp', verified=1 WHERE ctp_trade_key=?""", row_vals + (key,), ) stats["updated"] += 1 else: conn.execute( """INSERT INTO trade_logs (symbol, symbol_name, market_code, sina_code, monitor_type, direction, entry_price, stop_loss, take_profit, close_price, lots, margin, margin_pct, holding_minutes, open_time, close_time, pnl, fee, pnl_net, equity_after, result, source, ctp_trade_key, verified) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", row_vals + ("ctp", key, 1), ) stats["synced"] += 1 try: from trade_notify import notify_trade_log_close from trading_context import trading_mode_label from app import get_setting, send_wechat_msg from ai_worker import schedule_ai_event_analysis from db_conn import DB_PATH notify_trade_log_close( send_wechat=send_wechat_msg, get_setting=get_setting, mode_label=trading_mode_label(get_setting), capital=capital, sym=ths, symbol_name=codes.get("name") or mon.get("symbol_name") or ths, direction=direction, entry=entry, close_price=close_px, sl=float(sl) if sl is not None else None, tp=float(tp) if tp is not None else None, lots=lots, pnl_net=pnl_net, equity_after=equity_after, holding_minutes=minutes, result=result, monitor_type=monitor_type, schedule_ai_fn=schedule_ai_event_analysis, db_path=DB_PATH, ) except Exception as exc: logger.debug("ctp close notify: %s", exc) if stats["synced"] or stats["updated"]: try: from stats_engine import refresh_stats_cache refresh_stats_cache(conn, capital) except Exception as exc: logger.debug("stats refresh after ctp trade sync: %s", exc) purged = purge_duplicate_local_trade_logs(conn) if purged: stats["purged"] = purged return stats