feat: 期货下单写入DB来源与开仓时间,CTP同步均价保证金现价

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-25 15:59:15 +08:00
parent 0741997818
commit 649c064c2f
3 changed files with 148 additions and 59 deletions
+136 -54
View File
@@ -39,6 +39,7 @@ from sl_tp_guard import (
cancel_monitor_exit_orders,
ensure_monitor_order_columns,
monitor_order_status,
monitor_source_label,
place_monitor_exit_orders,
reconcile_monitors_without_position,
start_sl_tp_guard_worker,
@@ -188,8 +189,9 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
continue
existing = _find_active_monitor(conn, ths, direction)
if existing:
_sync_monitor_lots_from_ctp(
_sync_monitor_from_ctp(
conn, int(existing["id"]), ths, direction, mode, ctp=p,
capital=_capital(conn),
)
continue
sl, tp, trailing_be, initial_sl = _restore_sl_tp_from_closed(conn, ths, direction)
@@ -409,6 +411,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
tp,
trailing_be: int,
ctp_open_time: Optional[str] = None,
open_time: Optional[str] = None,
monitor_type: str = "manual",
) -> int:
ensure_monitor_order_columns(conn)
@@ -428,13 +431,16 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
initial_sl = sl_f
if not trailing_be:
trailing_be = int(existing.get("trailing_be") or 0)
open_time_val = existing.get("open_time") or now_s
if ctp_open_time:
open_time_val = (existing.get("open_time") or "").strip() or now_s
if open_time:
open_time_val = open_time
elif monitor_type == "ctp_sync" and ctp_open_time:
open_time_val = ctp_open_time
conn.execute(
"""UPDATE trade_order_monitors SET
symbol=?, symbol_name=?, market_code=?, lots=?, entry_price=?,
stop_loss=?, take_profit=?, initial_stop_loss=?, trailing_be=?, open_time=?
stop_loss=?, take_profit=?, initial_stop_loss=?, trailing_be=?, open_time=?,
monitor_type=?
WHERE id=?""",
(
sym,
@@ -447,11 +453,17 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
initial_sl,
trailing_be,
open_time_val,
monitor_type if monitor_type != "manual" else (existing.get("monitor_type") or "manual"),
mid,
),
)
else:
open_time_val = ctp_open_time or now_s
if open_time:
open_time_val = open_time
elif monitor_type == "ctp_sync" and ctp_open_time:
open_time_val = ctp_open_time
else:
open_time_val = now_s
conn.execute(
"""INSERT INTO trade_order_monitors (
symbol, symbol_name, market_code, direction, lots, entry_price,
@@ -477,10 +489,18 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
_close_duplicate_monitors(conn, sym, direction, mid)
return mid
def _sync_monitor_lots_from_ctp(
conn, mid: int, sym: str, direction: str, mode: str, *, ctp: Optional[dict] = None,
def _sync_monitor_from_ctp(
conn,
mid: int,
sym: str,
direction: str,
mode: str,
*,
ctp: Optional[dict] = None,
capital: float = 0.0,
) -> None:
positions = [ctp] if ctp else _ctp_positions(mode, refresh_if_empty=False, refresh_margin=False)
"""CTP 同步:均价、现价、保证金、仓位占比写入数据库;不覆盖期货下单的开仓时间。"""
positions = [ctp] if ctp else _ctp_positions(mode, refresh_if_empty=False, refresh_margin=True)
for p in positions:
if not p or int(p.get("lots") or 0) <= 0:
continue
@@ -488,25 +508,55 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
continue
if not _match_ctp_symbol(p.get("symbol") or "", sym):
continue
ctp_open = (p.get("open_time") or "").strip() or None
row = conn.execute(
"SELECT open_time FROM trade_order_monitors WHERE id=?", (mid,),
"SELECT open_time, monitor_type FROM trade_order_monitors WHERE id=?", (mid,),
).fetchone()
db_open = (row["open_time"] or "").strip() if row else ""
open_time_val = ctp_open or db_open
monitor_type = (row["monitor_type"] or "manual").strip().lower() if row else "manual"
ctp_open = (p.get("open_time") or "").strip() or None
open_time_val = db_open
if monitor_type == "ctp_sync" and ctp_open:
open_time_val = ctp_open
lots = int(p.get("lots") or 0)
entry = float(p.get("avg_price") or 0)
ctp_margin = float(p.get("margin") or 0)
float_pnl = p.get("pnl")
if float_pnl is not None:
float_pnl = round(float(float_pnl), 2)
mark = None
if ctp_status(mode).get("connected"):
mark = ctp_get_tick_price(mode, sym)
if mark is None or mark <= 0:
mark = entry if entry else None
margin = ctp_margin if ctp_margin > 0 else None
position_pct = None
if margin and capital > 0:
position_pct = round(float(margin) / float(capital) * 100, 2)
execute_retry(
conn,
"""UPDATE trade_order_monitors SET lots=?, entry_price=?,
open_time=? WHERE id=?""",
open_time=?, margin=?, position_pct=?, mark_price=?, float_pnl=?
WHERE id=?""",
(
int(p.get("lots") or 0),
float(p.get("avg_price") or 0),
lots,
entry,
open_time_val,
margin,
position_pct,
float(mark) if mark else None,
float_pnl,
mid,
),
)
return
def _sync_monitor_lots_from_ctp(
conn, mid: int, sym: str, direction: str, mode: str, *, ctp: Optional[dict] = None,
) -> None:
_sync_monitor_from_ctp(
conn, mid, sym, direction, mode, ctp=ctp, capital=_capital(conn),
)
def _compose_position_row(
conn,
*,
@@ -519,44 +569,63 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
) -> Optional[dict]:
if not mon and not ctp:
return None
if ctp:
sym = (ctp.get("symbol") or "").strip()
direction = ctp.get("direction") or "long"
lots = int(ctp.get("lots") or 0)
if lots <= 0:
return None
entry = float(ctp.get("avg_price") or 0)
float_pnl = ctp.get("pnl")
if float_pnl is not None:
float_pnl = round(float(float_pnl), 2)
source_label = "CTP 柜台"
else:
if mon:
sym = (mon.get("symbol") or "").strip()
direction = mon.get("direction") or "long"
lots = int(mon.get("lots") or 0)
if lots <= 0:
return None
entry = float(mon.get("entry_price") or 0)
float_pnl = None
source_label = "本地监控"
source_label = monitor_source_label(mon.get("monitor_type"))
open_time = (mon.get("open_time") or "").strip()
open_time_source = "order"
margin = mon.get("margin")
position_pct = mon.get("position_pct")
mark = mon.get("mark_price")
float_pnl = mon.get("float_pnl")
if float_pnl is not None:
float_pnl = round(float(float_pnl), 2)
else:
sym = (ctp.get("symbol") or "").strip()
direction = ctp.get("direction") or "long"
lots = int(ctp.get("lots") or 0)
entry = float(ctp.get("avg_price") or 0)
source_label = "CTP 柜台"
open_time = (ctp.get("open_time") or "").strip()
open_time_source = "ctp"
margin = None
position_pct = None
mark = None
float_pnl = ctp.get("pnl")
if float_pnl is not None:
float_pnl = round(float(float_pnl), 2)
if lots <= 0:
return None
if ctp:
if ctp.get("pnl") is not None:
float_pnl = round(float(ctp["pnl"]), 2)
if not mon:
ctp_lots = int(ctp.get("lots") or 0)
if ctp_lots > 0:
lots = ctp_lots
if float(ctp.get("avg_price") or 0) > 0:
entry = float(ctp.get("avg_price") or 0)
ctp_margin = float(ctp.get("margin") or 0)
if (margin is None or float(margin or 0) <= 0) and ctp_margin > 0:
margin = ctp_margin
codes = ths_to_codes(sym)
tick = calc_order_tick_metrics(sym, lots, entry)
sl = float(mon["stop_loss"]) if mon and mon.get("stop_loss") is not None else None
tp = float(mon["take_profit"]) if mon and mon.get("take_profit") is not None else None
ctp_open = (ctp.get("open_time") or "").strip() if ctp else ""
if ctp and ctp_open:
open_time = ctp_open
open_time_source = "ctp"
else:
open_time = ((mon.get("open_time") or "") if mon else "")
open_time_source = "local" if open_time else ""
holding = _holding_duration(open_time, now_iso) if open_time else ""
mark = None
if not fast and ctp_status(mode).get("connected"):
mark = ctp_get_tick_price(mode, sym)
if not fast and (mark is None or mark <= 0) and codes:
if (mark is None or float(mark or 0) <= 0) and not fast and ctp_status(mode).get("connected"):
live_mark = ctp_get_tick_price(mode, sym)
if live_mark and live_mark > 0:
mark = live_mark
if (mark is None or float(mark or 0) <= 0) and not fast and codes:
mark = fetch_price(
sym,
codes.get("market_code", ""),
@@ -581,15 +650,21 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
direction, entry, sl if sl is not None else entry,
tp if tp is not None else entry, lots, mark, capital, sym,
)
ctp_margin = float(ctp.get("margin") or 0) if ctp else 0.0
est_margin = pos_metrics.get("margin")
margin = ctp_margin if ctp_margin > 0 else est_margin
margin_source = "ctp" if ctp_margin > 0 else "estimate"
position_pct = (
round(float(margin) / capital * 100, 2)
if capital > 0 and margin
else pos_metrics.get("position_pct")
)
if margin is None or float(margin or 0) <= 0:
ctp_margin = float(ctp.get("margin") or 0) if ctp else 0.0
est_margin = pos_metrics.get("margin")
margin = ctp_margin if ctp_margin > 0 else est_margin
margin_source = "ctp" if ctp_margin > 0 else "estimate"
else:
margin_source = "ctp"
if position_pct is None or float(position_pct or 0) <= 0:
position_pct = (
round(float(margin) / capital * 100, 2)
if capital > 0 and margin
else pos_metrics.get("position_pct")
)
else:
position_pct = float(position_pct)
order_st = monitor_order_status(
mon or {}, mode=mode, ths_code=sym, direction=direction,
)
@@ -709,9 +784,10 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
elif key in ctp_by_key:
used_ctp_keys.add(key)
if ctp and mon and not fast:
_sync_monitor_lots_from_ctp(
_sync_monitor_from_ctp(
conn, int(mon["id"]), mon.get("symbol") or "",
mon.get("direction") or "long", mode, ctp=ctp,
capital=capital,
)
mon = _find_active_monitor(conn, mon.get("symbol") or "", mon.get("direction") or "long") or mon
try:
@@ -1408,9 +1484,11 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
conn.close()
return jsonify({"ok": False, "error": "开启移动保本须填写止损价"}), 400
if offset.startswith("open"):
from zoneinfo import ZoneInfo
sl = d.get("stop_loss")
tp = d.get("take_profit")
trailing_be = 1 if d.get("trailing_be") else 0
open_ts = datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S")
mid = _upsert_open_monitor(
conn,
sym=sym,
@@ -1420,12 +1498,16 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
sl=sl,
tp=tp,
trailing_be=trailing_be,
open_time=open_ts,
monitor_type="manual",
)
conn.commit()
_push_position_snapshot_async()
_push_position_snapshot_async(fast=True)
import time
time.sleep(2.0)
_sync_monitor_lots_from_ctp(conn, mid, sym, direction, mode)
time.sleep(1.5)
_sync_monitor_from_ctp(
conn, mid, sym, direction, mode, capital=_capital(conn),
)
mon_row = conn.execute(
"SELECT * FROM trade_order_monitors WHERE id=?", (mid,),
).fetchone()
+11 -4
View File
@@ -41,6 +41,10 @@ MONITOR_ORDER_COLUMNS = (
"ALTER TABLE trade_order_monitors ADD COLUMN trailing_be INTEGER DEFAULT 0",
"ALTER TABLE trade_order_monitors ADD COLUMN initial_stop_loss REAL",
"ALTER TABLE trade_order_monitors ADD COLUMN trailing_r_locked INTEGER DEFAULT 0",
"ALTER TABLE trade_order_monitors ADD COLUMN margin REAL",
"ALTER TABLE trade_order_monitors ADD COLUMN position_pct REAL",
"ALTER TABLE trade_order_monitors ADD COLUMN mark_price REAL",
"ALTER TABLE trade_order_monitors ADD COLUMN float_pnl REAL",
)
TRADE_RESULTS = ("止损", "止盈", "移动止盈", "保本止盈", "手动平仓")
@@ -146,13 +150,16 @@ def _release_close(monitor_id: int) -> None:
_closing_monitors.discard(monitor_id)
def _monitor_type_label(raw: str) -> str:
def monitor_source_label(raw: str) -> str:
"""持仓展示用来源文案。"""
mapping = {
"manual": "期货下单",
"trend": "趋势回调",
"roll": "顺势加仓",
"ctp_sync": "CTP 柜台",
}
return mapping.get(raw or "", raw or "程序监控")
key = (raw or "manual").strip().lower()
return mapping.get(key, raw or "期货下单")
def _result_for_close(mon: dict, reason: str) -> str:
@@ -284,7 +291,7 @@ def _write_trade_log(
open_time=(mon.get("open_time") or "").strip(),
symbol_name=mon.get("symbol_name") or sym,
market_code=mon.get("market_code") or "",
monitor_type=_monitor_type_label(mon.get("monitor_type") or ""),
monitor_type=monitor_source_label(mon.get("monitor_type") or ""),
capital=capital,
)
@@ -324,7 +331,7 @@ def write_manual_close_trade_log(
open_time=(mon.get("open_time") or open_time).strip(),
symbol_name=mon.get("symbol_name") or symbol_name,
market_code=mon.get("market_code") or market_code,
monitor_type=_monitor_type_label(mon.get("monitor_type") or ""),
monitor_type=monitor_source_label(mon.get("monitor_type") or ""),
capital=capital,
)
return
+1 -1
View File
@@ -693,7 +693,7 @@
(row.sync_pending ? ' · <span class="text-muted">同步柜台中…</span>' : '');
var feeLabel = row.fee_source === 'ctp' ? '手续费(柜台)' : '手续费';
var marginLabel = row.margin_source === 'ctp' ? '占用保证金(柜台)' : '占用保证金';
var openLabel = row.open_time_source === 'ctp' ? '开仓(柜台)' : '开仓';
var openLabel = '开仓';
return (
'<div class="pos-card">' +
'<div class="pos-card-head"><div><div class="title">' + row.symbol + ' <span class="badge dir">' + dirBadge + '</span></div>' +