"""Hub 中控市价全平后立即同步 order_monitors(三所共用)。""" from __future__ import annotations import time from typing import Any, Callable def reconcile_hub_external_close_impl( conn, symbol: str, direction: str, *, exchange_configured: Callable[[], bool], not_configured_msg: str, symbols_match: Callable[[str, str], bool], get_opened_at_value: Callable[[Any], str], resolve_monitor_exchange_symbol: Callable[[Any], str], get_live_position_contracts: Callable[[str, str], float | None], cancel_conditional_orders: Callable[[str], None], resolve_synced_flat_close: Callable[..., tuple], finalize_stopped_monitor: Callable[..., None], sync_trade_records: Callable[..., None] | None = None, reconcile_flat_streak: dict | None = None, to_ms_with_fallback: Callable[..., int | None] | None = None, prefer_manual_resolve: bool = False, order_row_monitor_type: Callable[[Any], str] | None = None, ) -> dict[str, Any]: if not exchange_configured(): return {"ok": False, "msg": not_configured_msg, "synced": 0} sym_req = (symbol or "").strip() dir_l = (direction or "").strip().lower() if dir_l not in ("long", "short"): return {"ok": False, "msg": "side 须为 long 或 short", "synced": 0} synced = 0 streak = reconcile_flat_streak if reconcile_flat_streak is not None else {} rows = conn.execute( "SELECT * FROM order_monitors WHERE status IN ('active', 'error')" ).fetchall() for r in rows: if not symbols_match(str(r["symbol"] or ""), sym_req): continue if (r["direction"] or "").strip().lower() != dir_l: continue oid = int(r["id"]) if r["status"] == "error": opened_at_chk = get_opened_at_value(r) mtype = order_row_monitor_type(r) if order_row_monitor_type else r["monitor_type"] existing = conn.execute( "SELECT id FROM trade_records WHERE symbol=? AND opened_at=? AND monitor_type=? LIMIT 1", (r["symbol"], opened_at_chk, mtype), ).fetchone() if existing: conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (oid,)) synced += 1 continue exchange_symbol = resolve_monitor_exchange_symbol(r) live_contracts = get_live_position_contracts(exchange_symbol, r["direction"]) if live_contracts is None: continue if live_contracts > 0: time.sleep(0.6) live_contracts = get_live_position_contracts(exchange_symbol, r["direction"]) if live_contracts is None or live_contracts > 0: continue streak.pop(oid, None) cancel_conditional_orders(exchange_symbol) opened_at = get_opened_at_value(r) opened_at_ms = None if to_ms_with_fallback is not None: keys = r.keys() if hasattr(r, "keys") else () opened_at_ms = to_ms_with_fallback( r["opened_at_ms"] if "opened_at_ms" in keys else None, opened_at, ) resolve_kw = {"opened_at_ms": opened_at_ms} if prefer_manual_resolve: resolve_kw["prefer_manual"] = True result, pnl_amount, closed_at, miss_reason = resolve_synced_flat_close( r, opened_at, **resolve_kw ) finalize_stopped_monitor( conn, r, result=result, pnl_amount=pnl_amount, closed_at=closed_at, miss_reason=miss_reason, ) synced += 1 if sync_trade_records is not None: try: sync_trade_records(conn, force=True) except Exception: pass return {"ok": True, "synced": synced}