Files
qihuo/install_trading.py
T
dekun 19676943d0 Align margin display with CTP counter rates and position margin.
Read margin ratios from CTP instrument query and margin-rate API instead of vnpy ContractData (which lacks ratios). Keep occupied margin on position UseMargin; use per-lot max rate for recommend table.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-29 10:21:44 +08:00

3257 lines
132 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Copyright (c) 2025-2026 马建军. All rights reserved.
# 专有软件 — 未经授权禁止复制、传播、转售。
# 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。
# 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md
"""期货下单、可开仓品种、策略交易路由注册。"""
from __future__ import annotations
import json
import logging
import threading
from datetime import datetime
from typing import Any, Callable, Optional
from flask import flash, jsonify, redirect, render_template, request, url_for, Response, stream_with_context
from contract_specs import calc_position_metrics, get_contract_spec
from fee_specs import calc_fee_breakdown
from kline_stream import sse_format
from market_sessions import is_night_trading_session, is_trading_session, trading_session_clock
from position_sizing import (
MODE_AMOUNT,
MODE_FIXED,
DEFAULT_MAX_ORDER_LOTS,
calc_lots_by_amount,
calc_lots_by_risk,
calc_margin_usage_pct,
cap_lots_for_margin_budget,
calc_order_tick_metrics,
normalize_sizing_mode,
)
from product_recommend import (
assert_product_allowed_for_capital,
should_apply_small_account_scope,
small_account_margin_recommendations,
small_account_scope_hint,
SMALL_ACCOUNT_SCOPE_LABEL,
)
from recommend_store import (
recommend_payload,
refresh_recommend_cache,
)
from recommend_stream import recommend_hub, schedule_recommend_refresh, start_recommend_worker
from position_stream import position_hub, start_position_worker
from ctp_settings import is_ctp_auto_connect_enabled
from ctp_reconnect import start_ctp_reconnect_worker
from ctp_premarket_connect import start_ctp_premarket_connect_worker
from ctp_fee_worker import start_ctp_fee_worker
from pending_order_worker import start_pending_order_worker
from order_pending import (
cancel_pending_monitor,
pending_auto_cancel_remaining,
pending_monitor_has_live_order,
reconcile_pending_orders,
)
from db_conn import execute_retry
from sl_tp_guard import (
cancel_monitor_exit_orders,
ensure_monitor_order_columns,
monitor_order_status,
monitor_source_label,
place_monitor_exit_orders,
reconcile_monitors_without_position,
start_sl_tp_guard_worker,
write_manual_close_trade_log,
)
from risk.account_risk_lib import (
assert_can_open,
get_risk_status,
on_mood_journal_freeze,
on_user_initiated_close,
parse_mood_issues,
reduce_cooloff_after_journal,
trading_day_label,
)
from strategy.strategy_db import init_strategy_tables
from strategy.strategy_roll_lib import avg_entry_after_add, preview_roll
from strategy.strategy_snapshot_lib import list_snapshots, save_snapshot
from strategy.strategy_trend_lib import (
compute_trend_plan_futures,
enrich_trend_plan_preview,
normalize_trend_period,
trend_dca_level_reached,
trend_period_label,
trend_strategy_periods,
)
from strategy.strategy_snapshot_lib import STRATEGY_ROLL, STRATEGY_TREND
from symbols import ths_to_codes, resolve_main_contract, PRODUCTS, PRODUCT_CATEGORIES, position_symbol_meta
from trading_context import (
TRADING_MODE_LIVE,
TRADING_MODE_SIM,
get_account_capital,
get_fixed_amount,
get_fixed_lots,
get_max_margin_pct,
get_pending_order_timeout_min,
get_pending_order_timeout_sec,
get_recommend_capital,
get_roll_max_margin_pct,
get_risk_percent,
get_sizing_mode,
get_trailing_be_tick_buffer,
get_trading_mode,
is_ctp_connected,
trading_mode_label,
)
from ctp_symbol import ths_to_vnpy_symbol
from ctp_trading_state import position_key, trading_state
from vnpy_bridge import (
_ctp_td_lock,
ctp_cancel_order,
ctp_connect,
ctp_account_margin_used,
ctp_estimate_margin_one_lot,
ctp_get_account,
ctp_get_tick_price,
ctp_list_active_orders,
ctp_list_positions,
ctp_list_trades,
ctp_status,
execute_order,
get_bridge,
set_position_refresh_callback,
set_tick_sl_tp_callback,
set_ctp_connected_callback,
)
logger = logging.getLogger(__name__)
def install_trading(app, *, login_required, require_nav, get_db, get_setting, set_setting, fetch_price, send_wechat_msg):
"""注册交易相关路由。"""
_nav = require_nav
def _sizing_mode_label(mode: str) -> str:
m = normalize_sizing_mode(mode)
if m == MODE_AMOUNT:
return "固定金额"
return "固定手数"
def _symbol_display_fields(sym: str) -> dict:
meta = position_symbol_meta(sym)
name = meta.get("name") or sym
return {
"symbol": name,
"symbol_name": name,
"symbol_exchange": meta.get("exchange") or "",
"symbol_is_main": bool(meta.get("is_main")),
}
def _schedule_recommend_refresh() -> None:
from db_conn import DB_PATH
schedule_recommend_refresh(
db_path=DB_PATH,
get_capital_fn=_recommend_capital,
quote_fn=_main_quote,
init_tables_fn=lambda c: init_strategy_tables(c),
get_mode_fn=lambda: get_trading_mode(get_setting),
get_max_margin_pct_fn=lambda: get_max_margin_pct(get_setting),
get_sizing_mode_fn=lambda: get_sizing_mode(get_setting),
get_fixed_lots_fn=lambda: get_fixed_lots(get_setting),
)
def _recommend_payload(conn) -> dict:
mode = get_trading_mode(get_setting)
return recommend_payload(
conn,
live_capital=_recommend_capital(conn),
max_margin_pct=get_max_margin_pct(get_setting),
trading_mode=mode,
sizing_mode=get_sizing_mode(get_setting),
fixed_lots=get_fixed_lots(get_setting),
)
def _recommend_capital(conn) -> float:
return get_recommend_capital(conn, get_setting)
def _settings_dict() -> dict:
return {
"trading_mode": get_trading_mode(get_setting),
"position_sizing_mode": get_sizing_mode(get_setting),
"risk_percent": str(get_risk_percent(get_setting)),
"max_margin_pct": str(get_max_margin_pct(get_setting)),
}
def _capital(conn) -> float:
return get_account_capital(conn, get_setting)
def _main_quote(product_ths: str) -> Optional[dict]:
for p in PRODUCTS:
if p["ths"] == product_ths:
main = resolve_main_contract(p)
if not main:
return None
sym = main.get("ths_code") or ""
codes = ths_to_codes(sym)
price = None
if codes:
price = fetch_price(
sym,
codes.get("market_code", ""),
codes.get("sina_code", ""),
)
return {
"ths_code": sym,
"price": price,
"display": main.get("display") or sym,
"name": main.get("name") or p.get("name"),
}
return None
def _ctp_account(mode: str) -> dict:
try:
return ctp_get_account(mode)
except Exception:
return {}
def _ctp_positions(
mode: str,
*,
refresh_if_empty: bool = True,
refresh_margin: bool = False,
) -> list:
try:
return ctp_list_positions(
mode,
refresh_if_empty=refresh_if_empty,
refresh_margin=refresh_margin,
)
except Exception:
return []
def _ctp_pos_to_ths_code(p: dict) -> str:
sym = (p.get("symbol") or "").strip()
ex = (p.get("exchange") or "").strip()
if not sym:
return ""
codes = ths_to_codes(sym)
if codes:
return codes.get("ths_code") or sym
if ex:
from vnpy_bridge import CtpBridge
ths = CtpBridge._vnpy_sym_to_ths(sym, ex)
if ths:
return ths
return sym
def _resolve_position_margin(
*,
sym: str,
direction: str,
lots: int,
entry: float,
mode: str,
ctp: Optional[dict] = None,
mon_margin: Optional[float] = None,
est_margin: Optional[float] = None,
) -> tuple[Optional[float], str]:
"""占用保证金:柜台持仓 > CTP 合约率估算 > 本地规格估算 > 库内缓存。"""
ctp_margin = float(ctp.get("margin") or 0) if ctp else 0.0
if ctp_margin > 0:
return round(ctp_margin, 2), "ctp"
connected = bool(ctp_status(mode).get("connected"))
ths_sym = sym
if ctp:
ths_sym = _ctp_pos_to_ths_code(ctp) or sym
else:
codes = ths_to_codes(sym)
if codes and codes.get("ths_code"):
ths_sym = codes["ths_code"]
if connected and ths_sym and entry > 0 and lots > 0:
per_lot = ctp_estimate_margin_one_lot(
mode, ths_sym, entry, direction=direction,
)
if per_lot and per_lot > 0:
return round(per_lot * lots, 2), "ctp"
if est_margin and float(est_margin) > 0:
return round(float(est_margin), 2), "estimate"
if not connected and mon_margin and float(mon_margin) > 0:
return round(float(mon_margin), 2), "db"
return None, "estimate"
def _apply_account_margin_to_rows(
rows: list[dict],
mode: str,
capital: float,
) -> list[dict]:
"""仅在持仓缺少柜台保证金时补全;已有 CTP 持仓保证金的行不覆盖。"""
if not ctp_status(mode).get("connected"):
return rows
active = [
r for r in rows
if r.get("order_state") != "pending" and int(r.get("lots") or 0) > 0
]
if not active:
return rows
def _has_ctp_margin(row: dict) -> bool:
return (
float(row.get("margin") or 0) > 0
and row.get("margin_source") == "ctp"
)
without_margin = [r for r in active if not _has_ctp_margin(r)]
for row in active:
if _has_ctp_margin(row) and capital > 0:
m = float(row.get("margin") or 0)
row["position_pct"] = round(m / capital * 100, 2)
if not without_margin:
return rows
total_used = ctp_account_margin_used(mode)
if not total_used:
return rows
known_sum = sum(
float(r.get("margin") or 0) for r in active if _has_ctp_margin(r)
)
pool = max(0.0, float(total_used) - known_sum) if known_sum > 0 else float(total_used)
if pool <= 0:
return rows
weights: list[float] = []
for row in without_margin:
sym = (row.get("symbol_code") or "").strip()
lots = int(row.get("lots") or 0)
entry = float(row.get("entry_price") or 0)
if sym and lots > 0 and entry > 0:
spec = get_contract_spec(sym)
weights.append(entry * spec["mult"] * lots)
else:
weights.append(0.0)
total_weight = sum(weights)
assigned = 0.0
for i, row in enumerate(without_margin):
if total_weight <= 0:
margin = round(pool / len(without_margin), 2)
elif i == len(without_margin) - 1:
margin = round(pool - assigned, 2)
else:
margin = round(pool * weights[i] / total_weight, 2)
assigned += margin
row["margin"] = margin
row["margin_source"] = "ctp"
if capital > 0:
row["position_pct"] = round(margin / capital * 100, 2)
return rows
def _persist_ctp_snapshot_to_monitors(
conn,
rows: list[dict],
mode: str,
) -> None:
"""将柜台校正后的保证金、仓位占比、已扣开仓手续费写入 trade_order_monitors。"""
if not ctp_status(mode).get("connected"):
return
ensure_monitor_order_columns(conn)
for row in rows:
mid = row.get("monitor_id")
if not mid or row.get("order_state") == "pending":
continue
margin = row.get("margin")
position_pct = row.get("position_pct")
open_fee = row.get("est_fee")
if margin is None and position_pct is None and open_fee is None:
continue
try:
execute_retry(
conn,
"""UPDATE trade_order_monitors SET
margin=COALESCE(?, margin),
position_pct=COALESCE(?, position_pct),
open_fee=COALESCE(?, open_fee)
WHERE id=? AND status='active'""",
(margin, position_pct, open_fee, int(mid)),
)
except Exception as exc:
logger.debug("persist monitor ctp snapshot %s: %s", mid, exc)
def _ensure_monitors_from_ctp(conn, mode: str) -> None:
"""CTP 有持仓但本地无监控时,自动补写一条 active 记录供展示。"""
if not ctp_status(mode).get("connected"):
return
ctp_positions = _ctp_positions(mode, refresh_if_empty=True)
for p in ctp_positions:
lots = int(p.get("lots") or 0)
if lots <= 0:
continue
direction = p.get("direction") or "long"
ths = _ctp_pos_to_ths_code(p)
if not ths:
continue
existing = _find_or_revive_monitor(conn, ths, direction)
if existing:
_sync_monitor_from_ctp(
conn, int(existing["id"]), ths, direction, mode, ctp=p,
capital=_capital(conn),
)
continue
sl, tp, trailing_be, initial_sl = _restore_sl_tp_from_closed(conn, ths, direction)
ctp_open = (p.get("open_time") or "").strip()
mid = _upsert_open_monitor(
conn,
sym=ths,
direction=direction,
lots=lots,
price=float(p.get("avg_price") or 0),
sl=sl,
tp=tp,
trailing_be=trailing_be,
ctp_open_time=ctp_open or None,
monitor_type="ctp_sync",
)
if initial_sl is not None and sl is not None:
conn.execute(
"UPDATE trade_order_monitors SET initial_stop_loss=? WHERE id=?",
(initial_sl, mid),
)
if ctp_positions:
return
def _restore_recent_pending_monitors(conn, mode: str) -> None:
"""重启或 vnpy 委托缓存丢失时,恢复当日最近一笔可能仍有效的开仓挂单。"""
if not ctp_status(mode).get("connected"):
return
if conn.execute("SELECT 1 FROM trade_order_monitors WHERE status='pending' LIMIT 1").fetchone():
return
today = datetime.now().strftime("%Y-%m-%d")
row = conn.execute(
"""SELECT * FROM trade_order_monitors
WHERE status='closed' AND monitor_type='manual'
AND vt_order_id IS NOT NULL AND vt_order_id != ''
AND open_time LIKE ?
ORDER BY id DESC LIMIT 1""",
(f"{today}%",),
).fetchone()
if not row:
return
mon = dict(row)
sym = mon.get("symbol") or ""
direction = (mon.get("direction") or "long").strip().lower()
if _find_active_monitor(conn, sym, direction):
return
for p in _ctp_positions(mode, refresh_if_empty=False):
if int(p.get("lots") or 0) <= 0:
continue
if (p.get("direction") or "long") != direction:
continue
if _match_ctp_symbol(p.get("symbol") or "", sym):
return
conn.execute(
"UPDATE trade_order_monitors SET status='pending' WHERE id=?",
(mon["id"],),
)
logger.info("恢复挂单监控 id=%s sym=%s", mon.get("id"), sym)
def _match_ctp_symbol(ctp_sym: str, ths: str) -> bool:
a = (ctp_sym or "").lower()
b = (ths or "").lower()
if a == b:
return True
if a and b and a.split(".")[0] == b.split(".")[0]:
return True
try:
vnpy_sym, _ = ths_to_vnpy_symbol(ths)
if a == vnpy_sym.lower():
return True
except Exception:
pass
try:
vnpy_sym, _ = ths_to_vnpy_symbol(ctp_sym)
if vnpy_sym.lower() == b.split(".")[0]:
return True
except Exception:
pass
return False
def _open_commission_from_ctp_trades(
mode: str, sym: str, direction: str,
) -> Optional[float]:
"""汇总该持仓开仓成交的柜台手续费(成交回报中的 commission)。"""
if not ctp_status(mode).get("connected"):
return None
try:
trades = ctp_list_trades(mode)
except Exception:
return None
total = 0.0
has_commission = False
for t in trades:
if (t.get("offset") or "").strip().lower() != "open":
continue
pos_dir = (
t.get("position_direction") or t.get("direction") or "long"
).strip().lower()
if pos_dir != (direction or "long").strip().lower():
continue
if not _match_ctp_symbol(t.get("symbol") or "", sym):
continue
comm = float(t.get("commission") or 0)
total += comm
if comm > 0:
has_commission = True
return round(total, 2) if has_commission else None
def _holding_duration(open_time: str, now_iso: str) -> str:
try:
from app import calc_holding_duration
open_s = (open_time or "").strip().replace("T", " ")[:19]
now_s = (now_iso or "").strip().replace("T", " ")[:19]
if not open_s or not now_s:
return ""
return calc_holding_duration(open_s, now_s)
except Exception:
return ""
def _restore_sl_tp_from_closed(conn, sym: str, direction: str) -> tuple:
"""重启后从最近关闭的同品种监控恢复止盈止损。"""
direction = (direction or "long").strip().lower()
for r in conn.execute(
"SELECT symbol, direction, stop_loss, take_profit, trailing_be, initial_stop_loss "
"FROM trade_order_monitors WHERE status='closed' ORDER BY id DESC LIMIT 80"
).fetchall():
row = dict(r)
if (row.get("direction") or "long") != direction:
continue
if not _match_ctp_symbol(sym, row.get("symbol") or ""):
continue
if row.get("stop_loss") is None and row.get("take_profit") is None:
continue
return (
row.get("stop_loss"),
row.get("take_profit"),
int(row.get("trailing_be") or 0),
row.get("initial_stop_loss"),
)
return None, None, 0, None
def _ctp_position_keys(mode: str) -> set[tuple[str, str]]:
keys: set[tuple[str, str]] = set()
for p in _ctp_positions(mode):
lots = int(p.get("lots") or 0)
if lots <= 0:
continue
sym = (p.get("symbol") or "").lower()
direction = p.get("direction") or "long"
keys.add((sym, direction))
return keys
def _monitor_matches_ctp_position(mon: dict, position_keys: set[tuple[str, str]]) -> bool:
ms = mon.get("symbol") or ""
md = mon.get("direction") or "long"
for ps, pd in position_keys:
if pd != md:
continue
if _match_ctp_symbol(ps, ms):
return True
return False
def _sync_trade_monitors_with_ctp(conn, mode: str) -> int:
"""关闭无对应 CTP 持仓的监控,并撤销残留止盈止损挂单。"""
return reconcile_monitors_without_position(conn, mode)
def _effective_active_position_count(conn, mode: str) -> int:
if ctp_status(mode).get("connected"):
return len(_ctp_position_keys(mode))
row = conn.execute(
"SELECT COUNT(*) AS n FROM trade_order_monitors WHERE status='active'"
).fetchone()
return int(row["n"] or 0)
def _build_pending_orders(conn, mode: str) -> list[dict]:
pending: list[dict] = []
for r in conn.execute(
"SELECT * FROM trade_order_monitors WHERE status='active' ORDER BY id DESC"
).fetchall():
mon = dict(r)
sym = mon.get("symbol") or ""
direction = mon.get("direction") or "long"
lots = int(mon.get("lots") or 0)
base = {
"symbol_code": sym,
"direction": direction,
"direction_label": "做多" if direction == "long" else "做空",
"lots": lots,
"source": "monitor",
"monitor_id": mon.get("id"),
**_symbol_display_fields(sym),
}
sl = mon.get("stop_loss")
tp = mon.get("take_profit")
if sl is not None:
pending.append({
**base,
"order_kind": "stop_loss",
"label": "止损监控",
"price": float(sl),
})
if tp is not None:
pending.append({
**base,
"order_kind": "take_profit",
"label": "止盈监控",
"price": float(tp),
})
for r in conn.execute(
"SELECT * FROM trade_order_monitors WHERE status='pending' ORDER BY id DESC"
).fetchall():
mon = dict(r)
sym = mon.get("symbol") or ""
pending.append({
"symbol_code": sym,
"direction": mon.get("direction") or "long",
"direction_label": "做多" if (mon.get("direction") or "long") == "long" else "做空",
"lots": int(mon.get("lots") or 0),
"price": float(mon.get("order_price") or mon.get("entry_price") or 0),
"order_kind": "open_pending",
"label": "开仓挂单中",
"source": "monitor",
"monitor_id": mon.get("id"),
"can_cancel_order": is_trading_session(),
"cancel_allowed": is_trading_session(),
**_symbol_display_fields(sym),
})
ctp_st = ctp_status(mode)
if ctp_st.get("connected"):
for o in _ctp_active_orders(mode):
sym = _ctp_pos_to_ths_code(o) or (o.get("symbol") or "")
offset_s = (o.get("offset") or "").upper()
kind = "limit"
label = "委托挂单"
if "CLOSE" in offset_s:
label = "平仓委托"
pending.append({
"symbol_code": sym,
"symbol": _symbol_display_fields(sym).get("symbol_name") or sym,
"direction": o.get("direction") or "long",
"direction_label": "做多" if o.get("direction") == "long" else "做空",
"lots": int(o.get("lots") or 0),
"price": float(o.get("price") or 0),
"order_kind": kind,
"label": label,
"source": "ctp",
"order_id": o.get("order_id"),
"vt_order_id": o.get("vt_order_id") or o.get("order_id"),
"can_cancel_order": is_trading_session(),
"cancel_allowed": is_trading_session(),
**_symbol_display_fields(sym),
})
return pending
def _ctp_active_orders(mode: str) -> list:
try:
return ctp_list_active_orders(mode)
except Exception:
return []
def _canonical_position_key(symbol: str, direction: str, exchange: str = "") -> str:
sym = (symbol or "").strip()
d = (direction or "long").strip().lower()
ex = (exchange or "").strip().upper()
try:
vnpy_sym, ex2 = ths_to_vnpy_symbol(sym)
sym = vnpy_sym
if not ex:
ex = ex2
except Exception:
sym = sym.lower()
return position_key(ex, sym, d)
def _position_key_from_ctp(p: dict) -> str:
return position_key(
p.get("exchange") or "",
p.get("symbol") or "",
p.get("direction") or "long",
)
def _monitor_position_key(mon: dict, exchange: str = "") -> str:
sym = (mon.get("symbol") or "").strip()
d = (mon.get("direction") or "long").strip().lower()
ex = (exchange or "").strip().upper()
try:
vnpy_sym, ex2 = ths_to_vnpy_symbol(sym)
sym = vnpy_sym
if not ex:
ex = ex2
except Exception:
sym = sym.lower()
return position_key(ex, sym, d)
def _monitors_by_position_key(conn) -> dict[str, dict]:
ensure_monitor_order_columns(conn)
out: dict[str, dict] = {}
for r in conn.execute(
"SELECT * FROM trade_order_monitors WHERE status='active' ORDER BY id DESC"
).fetchall():
mon = dict(r)
pk = _monitor_position_key(mon)
if pk not in out:
out[pk] = mon
return out
def _find_active_monitor(conn, symbol: str, direction: str) -> Optional[dict]:
direction = (direction or "long").strip().lower()
for r in conn.execute(
"SELECT * FROM trade_order_monitors WHERE status='active' ORDER BY id DESC"
).fetchall():
row = dict(r)
if (row.get("direction") or "long") != direction:
continue
if _match_ctp_symbol(symbol, row.get("symbol") or ""):
return row
return None
def _find_pending_monitor(conn, symbol: str, direction: str) -> Optional[dict]:
"""开仓委托 pending 仍带止损/移动保本元数据,需与 CTP 持仓关联展示。"""
direction = (direction or "long").strip().lower()
for r in conn.execute(
"SELECT * FROM trade_order_monitors WHERE status='pending' ORDER BY id DESC"
).fetchall():
row = dict(r)
if (row.get("direction") or "long") != direction:
continue
if _match_ctp_symbol(symbol, row.get("symbol") or ""):
return row
return None
def _has_pending_monitors(conn) -> bool:
return bool(
conn.execute(
"SELECT 1 FROM trade_order_monitors WHERE status='pending' LIMIT 1"
).fetchone()
)
def _revive_closed_monitor(conn, symbol: str, direction: str) -> Optional[dict]:
"""柜台仍有持仓但本地监控被误关时,恢复最近一条同品种记录。"""
direction = (direction or "long").strip().lower()
for r in conn.execute(
"SELECT * FROM trade_order_monitors WHERE status='closed' ORDER BY id DESC LIMIT 40"
).fetchall():
row = dict(r)
if (row.get("direction") or "long") != direction:
continue
if not _match_ctp_symbol(symbol, row.get("symbol") or ""):
continue
if int(row.get("lots") or 0) <= 0:
continue
conn.execute(
"UPDATE trade_order_monitors SET status='active' WHERE id=?",
(row["id"],),
)
row["status"] = "active"
logger.info(
"恢复误关闭监控 id=%s sym=%s dir=%s",
row.get("id"), row.get("symbol"), direction,
)
return row
return None
def _find_or_revive_monitor(conn, symbol: str, direction: str) -> Optional[dict]:
active = _find_active_monitor(conn, symbol, direction)
if active:
return active
return _revive_closed_monitor(conn, symbol, direction)
def _close_all_monitors_for_sym_dir(conn, symbol: str, direction: str) -> None:
direction = (direction or "long").strip().lower()
for r in conn.execute(
"SELECT id, symbol, direction FROM trade_order_monitors "
"WHERE status IN ('active', 'pending')"
).fetchall():
if (r["direction"] or "long") != direction:
continue
if _match_ctp_symbol(symbol, r["symbol"] or ""):
conn.execute(
"UPDATE trade_order_monitors SET status='closed' WHERE id=?",
(r["id"],),
)
def _close_duplicate_monitors(conn, symbol: str, direction: str, keep_id: int) -> None:
direction = (direction or "long").strip().lower()
for r in conn.execute(
"SELECT id, symbol, direction FROM trade_order_monitors WHERE status='active'"
).fetchall():
if int(r["id"]) == int(keep_id):
continue
if (r["direction"] or "long") != direction:
continue
if _match_ctp_symbol(symbol, r["symbol"] or ""):
conn.execute(
"UPDATE trade_order_monitors SET status='closed' WHERE id=?",
(r["id"],),
)
def _upsert_open_monitor(
conn,
*,
sym: str,
direction: str,
lots: int,
price: float,
sl,
tp,
trailing_be: int,
ctp_open_time: Optional[str] = None,
open_time: Optional[str] = None,
monitor_type: str = "manual",
status: str = "active",
vt_order_id: Optional[str] = None,
order_price: Optional[float] = None,
) -> int:
ensure_monitor_order_columns(conn)
codes = ths_to_codes(sym) or {}
sl_f = float(sl) if sl not in (None, "") else None
tp_f = float(tp) if tp not in (None, "") else None
now_s = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
status_val = status if status in ("pending", "active") else "active"
order_px = float(order_price if order_price is not None else price)
existing = _find_active_monitor(conn, sym, direction)
if not existing:
for r in conn.execute(
"SELECT * FROM trade_order_monitors WHERE status='pending' ORDER BY id DESC"
).fetchall():
row = dict(r)
if (row.get("direction") or "long") != (direction or "long").strip().lower():
continue
if _match_ctp_symbol(sym, row.get("symbol") or ""):
existing = row
break
if existing:
mid = int(existing["id"])
existing_status = (existing.get("status") or "active").strip().lower()
if existing_status == "active" and status_val == "pending":
status_val = "active"
initial_sl = existing.get("initial_stop_loss")
if sl_f is None:
sl_f = float(existing["stop_loss"]) if existing.get("stop_loss") is not None else None
if tp_f is None:
tp_f = float(existing["take_profit"]) if existing.get("take_profit") is not None else None
if sl_f is not None and initial_sl is None:
initial_sl = sl_f
if not trailing_be:
trailing_be = int(existing.get("trailing_be") or 0)
open_time_val = (existing.get("open_time") or "").strip() or now_s
if open_time:
open_time_val = open_time
elif monitor_type == "ctp_sync" and ctp_open_time:
open_time_val = ctp_open_time
vt_val = vt_order_id or existing.get("vt_order_id")
conn.execute(
"""UPDATE trade_order_monitors SET
symbol=?, symbol_name=?, market_code=?, lots=?, entry_price=?,
stop_loss=?, take_profit=?, initial_stop_loss=?, trailing_be=?, open_time=?,
monitor_type=?, status=?, vt_order_id=?, order_price=?
WHERE id=?""",
(
sym,
codes.get("name", sym),
codes.get("market_code", ""),
lots,
price,
sl_f,
tp_f,
initial_sl,
trailing_be,
open_time_val,
monitor_type if monitor_type != "manual" else (existing.get("monitor_type") or "manual"),
status_val,
vt_val,
order_px,
mid,
),
)
else:
if open_time:
open_time_val = open_time
elif monitor_type == "ctp_sync" and ctp_open_time:
open_time_val = ctp_open_time
else:
open_time_val = now_s
conn.execute(
"""INSERT INTO trade_order_monitors (
symbol, symbol_name, market_code, direction, lots, entry_price,
stop_loss, take_profit, initial_stop_loss, trailing_be,
open_time, monitor_type, status, vt_order_id, order_price
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
sym,
codes.get("name", sym),
codes.get("market_code", ""),
direction,
lots,
price,
sl_f,
tp_f,
sl_f,
trailing_be,
open_time_val,
monitor_type,
status_val,
vt_order_id,
order_px,
),
)
mid = int(conn.execute("SELECT last_insert_rowid()").fetchone()[0])
if status_val == "active":
_close_duplicate_monitors(conn, sym, direction, mid)
return mid
def _sync_monitor_from_ctp(
conn,
mid: int,
sym: str,
direction: str,
mode: str,
*,
ctp: Optional[dict] = None,
capital: float = 0.0,
) -> None:
"""CTP 同步:均价、现价、保证金、仓位占比写入数据库;不覆盖期货下单的开仓时间。"""
positions = [ctp] if ctp else _ctp_positions(mode, refresh_if_empty=False, refresh_margin=True)
for p in positions:
if not p or int(p.get("lots") or 0) <= 0:
continue
if (p.get("direction") or "long") != direction:
continue
if not _match_ctp_symbol(p.get("symbol") or "", sym):
continue
row = conn.execute(
"SELECT open_time, monitor_type FROM trade_order_monitors WHERE id=?", (mid,),
).fetchone()
db_open = (row["open_time"] or "").strip() if row else ""
monitor_type = (row["monitor_type"] or "manual").strip().lower() if row else "manual"
ctp_open = (p.get("open_time") or "").strip() or None
open_time_val = db_open
if monitor_type == "ctp_sync" and ctp_open:
open_time_val = ctp_open
lots = int(p.get("lots") or 0)
entry = float(p.get("avg_price") or 0)
ctp_margin = float(p.get("margin") or 0)
float_pnl = p.get("pnl")
if float_pnl is not None:
float_pnl = round(float(float_pnl), 2)
mark = None
if ctp_status(mode).get("connected"):
mark = ctp_get_tick_price(mode, sym)
if mark is None or mark <= 0:
mark = entry if entry else None
est = calc_position_metrics(
direction, entry, entry, entry, lots, mark or entry, capital, sym,
).get("margin")
margin, _src = _resolve_position_margin(
sym=sym,
direction=direction,
lots=lots,
entry=entry,
mode=mode,
ctp=p,
est_margin=est,
)
position_pct = None
if margin and capital > 0:
position_pct = round(float(margin) / float(capital) * 100, 2)
open_commission = _open_commission_from_ctp_trades(mode, sym, direction)
if open_commission is None:
fee_info = calc_fee_breakdown(
sym, entry, entry, lots, open_time_val or "", "",
trading_mode=mode,
)
open_commission = fee_info.get("open_fee")
execute_retry(
conn,
"""UPDATE trade_order_monitors SET lots=?, entry_price=?,
open_time=?, margin=?, position_pct=?, mark_price=?, float_pnl=?,
open_fee=?
WHERE id=?""",
(
lots,
entry,
open_time_val,
margin,
position_pct,
float(mark) if mark else None,
float_pnl,
open_commission,
mid,
),
)
return
def _sync_monitor_lots_from_ctp(
conn, mid: int, sym: str, direction: str, mode: str, *, ctp: Optional[dict] = None,
) -> None:
_sync_monitor_from_ctp(
conn, mid, sym, direction, mode, ctp=ctp, capital=_capital(conn),
)
def _compose_position_row(
conn,
*,
mon: Optional[dict],
ctp: Optional[dict],
mode: str,
capital: float,
now_iso: str,
fast: bool = False,
) -> Optional[dict]:
if not mon and not ctp:
return None
if mon:
sym = (mon.get("symbol") or "").strip()
direction = mon.get("direction") or "long"
lots = int(mon.get("lots") or 0)
entry = float(mon.get("entry_price") or 0)
source_label = monitor_source_label(mon.get("monitor_type"))
open_time = (mon.get("open_time") or "").strip()
open_time_source = "order"
margin = mon.get("margin")
position_pct = mon.get("position_pct")
mark = mon.get("mark_price")
float_pnl = mon.get("float_pnl")
if float_pnl is not None:
float_pnl = round(float(float_pnl), 2)
else:
sym = (ctp.get("symbol") or "").strip()
direction = ctp.get("direction") or "long"
lots = int(ctp.get("lots") or 0)
entry = float(ctp.get("avg_price") or 0)
source_label = "CTP 柜台"
open_time = (ctp.get("open_time") or "").strip()
open_time_source = "ctp"
margin = None
position_pct = None
mark = None
float_pnl = ctp.get("pnl")
if float_pnl is not None:
float_pnl = round(float(float_pnl), 2)
if lots <= 0:
return None
if ctp:
if ctp.get("pnl") is not None:
float_pnl = round(float(ctp["pnl"]), 2)
if not mon:
ctp_lots = int(ctp.get("lots") or 0)
if ctp_lots > 0:
lots = ctp_lots
if float(ctp.get("avg_price") or 0) > 0:
entry = float(ctp.get("avg_price") or 0)
ctp_margin = float(ctp.get("margin") or 0)
if (margin is None or float(margin or 0) <= 0) and ctp_margin > 0:
margin = ctp_margin
codes = ths_to_codes(sym)
tick = calc_order_tick_metrics(sym, lots, entry, trading_mode=mode)
sl = float(mon["stop_loss"]) if mon and mon.get("stop_loss") is not None else None
tp = float(mon["take_profit"]) if mon and mon.get("take_profit") is not None else None
holding = _holding_duration(open_time, now_iso) if open_time else ""
if ctp_status(mode).get("connected"):
live_mark = ctp_get_tick_price(mode, sym)
if live_mark and live_mark > 0:
mark = live_mark
elif (mark is None or float(mark or 0) <= 0) and not fast and codes:
mark = fetch_price(
sym,
codes.get("market_code", ""),
codes.get("sina_code", ""),
)
if mark is None or mark <= 0:
mark = entry if entry else None
close_est = float(mark) if mark and mark > 0 else entry
if float_pnl is None and mark and entry:
pos_tmp = calc_position_metrics(
direction, entry, sl or entry, tp or entry, lots, mark, capital, sym,
)
float_pnl = pos_tmp.get("float_pnl")
fee_info = calc_fee_breakdown(
sym, entry, close_est, lots, open_time or now_iso, now_iso, trading_mode=mode,
)
open_commission = _open_commission_from_ctp_trades(mode, sym, direction)
if open_commission is None and mon and mon.get("open_fee") is not None:
cached_fee = float(mon.get("open_fee") or 0)
if cached_fee > 0:
open_commission = cached_fee
if open_commission is not None:
display_fee = open_commission
fee_source = "ctp"
else:
display_fee = fee_info["open_fee"]
fee_source = fee_info.get("fee_source") or "local"
est_net = None
if float_pnl is not None:
est_net = round(float(float_pnl) - fee_info["close_fee"], 2)
pos_metrics = calc_position_metrics(
direction, entry, sl if sl is not None else entry,
tp if tp is not None else entry, lots, mark, capital, sym,
)
mon_margin = margin
margin, margin_source = _resolve_position_margin(
sym=sym,
direction=direction,
lots=lots,
entry=entry,
mode=mode,
ctp=ctp,
mon_margin=mon_margin if isinstance(mon_margin, (int, float)) else None,
est_margin=pos_metrics.get("margin"),
)
if margin and capital > 0:
position_pct = round(float(margin) / float(capital) * 100, 2)
elif position_pct is None or float(position_pct or 0) <= 0:
position_pct = pos_metrics.get("position_pct")
elif position_pct is not None:
position_pct = float(position_pct)
order_st = monitor_order_status(
mon or {}, mode=mode, ths_code=sym, direction=direction,
)
pending_for_row: list[dict] = []
if sl is not None:
pending_for_row.append({
"order_kind": "stop_loss",
"label": "止损监控",
"price": sl,
"lots": lots,
"source": "monitor",
"monitor_id": mon["id"] if mon else None,
})
if tp is not None:
pending_for_row.append({
"order_kind": "take_profit",
"label": "止盈监控",
"price": tp,
"lots": lots,
"source": "monitor",
"monitor_id": mon["id"] if mon else None,
})
row_key = _canonical_position_key(
sym, direction, (ctp or {}).get("exchange") or "",
)
return {
"key": row_key,
"position_key": row_key,
"source": "ctp",
"source_label": source_label,
"sync_pending": False,
"monitor_id": mon["id"] if mon else None,
"symbol_code": sym,
**_symbol_display_fields(sym),
"direction": direction,
"direction_label": "做多" if direction == "long" else "做空",
"lots": lots,
"entry_price": entry,
"stop_loss": sl,
"take_profit": tp,
"open_time": open_time or None,
"open_time_source": open_time_source or None,
"holding_duration": holding or None,
"mark_price": mark,
"current_price": mark,
"margin": margin,
"margin_source": margin_source,
"position_pct": position_pct,
"risk_amount": pos_metrics.get("risk_amount") if sl is not None else None,
"reward_amount": pos_metrics.get("reward_amount") if tp is not None else None,
"risk_pct": pos_metrics.get("risk_pct") if sl is not None else None,
"rr_ratio": pos_metrics.get("rr_ratio") if sl is not None and tp is not None else None,
"float_pnl": float_pnl,
"est_fee": display_fee,
"est_fee_open": display_fee,
"est_fee_close": fee_info["close_fee"],
"est_fee_close_type": fee_info["close_type"],
"fee_source": fee_source,
"est_pnl_net": est_net,
"sl_order_active": order_st.get("sl_monitoring"),
"tp_order_active": order_st.get("tp_monitoring"),
"sl_monitoring": order_st.get("sl_monitoring"),
"tp_monitoring": order_st.get("tp_monitoring"),
"can_place_orders": False,
"tick_value_total": tick.get("tick_value_total"),
"price_precision": tick.get("price_precision"),
"tick_size": tick.get("tick_size"),
"can_close": True,
"close_allowed": is_trading_session(),
"pending_orders": pending_for_row,
"trailing_be": bool(mon.get("trailing_be")) if mon else False,
"trailing_r_locked": int(mon.get("trailing_r_locked") or 0) if mon else 0,
}
def _compose_pending_row(
mon: dict,
*,
mode: str,
capital: float,
now_iso: str,
) -> Optional[dict]:
sym = (mon.get("symbol") or "").strip()
direction = (mon.get("direction") or "long").strip().lower()
lots = int(mon.get("lots") or 0)
if not sym or lots <= 0:
return None
order_price = float(mon.get("order_price") or mon.get("entry_price") or 0)
codes = ths_to_codes(sym)
sl = float(mon["stop_loss"]) if mon.get("stop_loss") is not None else None
tp = float(mon["take_profit"]) if mon.get("take_profit") is not None else None
pos_metrics = calc_position_metrics(
direction, order_price, sl or order_price, tp or order_price, lots, order_price, capital, sym,
)
open_time = (mon.get("open_time") or "").strip()
timeout_sec = get_pending_order_timeout_sec(get_setting)
remain = pending_auto_cancel_remaining(mon, timeout_sec=timeout_sec)
return {
"key": f"{_canonical_position_key(sym, direction)}:pending:{mon.get('id')}",
"order_state": "pending",
"source": "pending",
"source_label": "委托挂单中",
"sync_pending": True,
"monitor_id": mon.get("id"),
"symbol_code": sym,
**_symbol_display_fields(sym),
"direction": direction,
"direction_label": "做多" if direction == "long" else "做空",
"lots": lots,
"entry_price": order_price,
"order_price": order_price,
"stop_loss": sl,
"take_profit": tp,
"open_time": open_time or None,
"holding_duration": _holding_duration(open_time, now_iso) if open_time else None,
"mark_price": order_price,
"current_price": order_price,
"margin": pos_metrics.get("margin"),
"margin_source": "estimate",
"position_pct": pos_metrics.get("position_pct"),
"risk_amount": pos_metrics.get("risk_amount") if sl is not None else None,
"reward_amount": pos_metrics.get("reward_amount") if tp is not None else None,
"rr_ratio": pos_metrics.get("rr_ratio") if sl is not None and tp is not None else None,
"float_pnl": None,
"est_fee": None,
"can_close": False,
"close_allowed": False,
"can_cancel_order": is_trading_session(),
"cancel_allowed": is_trading_session(),
"auto_cancel_sec": remain,
"pending_timeout_sec": timeout_sec,
"pending_timeout_min": max(1, timeout_sec // 60),
"vt_order_id": mon.get("vt_order_id"),
"sl_order_active": False,
"tp_order_active": False,
"sl_monitoring": bool(sl is not None),
"tp_monitoring": bool(tp is not None),
"can_place_orders": False,
"pending_orders": [],
"trailing_be": bool(mon.get("trailing_be")),
"trailing_r_locked": int(mon.get("trailing_r_locked") or 0),
}
def _compose_ctp_open_order_row(
o: dict,
*,
mode: str,
capital: float,
now_iso: str,
) -> Optional[dict]:
offset_u = (o.get("offset") or "").upper()
if offset_u and "OPEN" not in offset_u:
return None
sym = _ctp_pos_to_ths_code(o) or (o.get("symbol") or "").strip()
direction = (o.get("direction") or "long").strip().lower()
lots = int(o.get("lots") or 0)
if not sym or lots <= 0:
return None
order_price = float(o.get("price") or 0)
pos_metrics = calc_position_metrics(
direction, order_price, order_price, order_price, lots, order_price, capital, sym,
)
timeout_sec = get_pending_order_timeout_sec(get_setting)
return {
"key": f"{_canonical_position_key(sym, direction)}:pending:ctp:{o.get('order_id') or ''}",
"order_state": "pending",
"source": "ctp",
"source_label": "委托挂单",
"sync_pending": True,
"monitor_id": None,
"order_id": o.get("order_id"),
"vt_order_id": o.get("vt_order_id") or o.get("order_id"),
"symbol_code": sym,
**_symbol_display_fields(sym),
"direction": direction,
"direction_label": "做多" if direction == "long" else "做空",
"lots": lots,
"entry_price": order_price,
"order_price": order_price,
"stop_loss": None,
"take_profit": None,
"open_time": now_iso,
"holding_duration": None,
"mark_price": order_price,
"current_price": order_price,
"margin": pos_metrics.get("margin"),
"margin_source": "estimate",
"position_pct": pos_metrics.get("position_pct"),
"float_pnl": None,
"est_fee": None,
"can_close": False,
"close_allowed": False,
"can_cancel_order": is_trading_session(),
"cancel_allowed": is_trading_session(),
"pending_timeout_sec": timeout_sec,
"pending_timeout_min": max(1, timeout_sec // 60),
"sl_order_active": False,
"tp_order_active": False,
"sl_monitoring": False,
"tp_monitoring": False,
"can_place_orders": False,
"pending_orders": [],
"trailing_be": False,
"trailing_r_locked": 0,
}
def _reconcile_pending(conn, mode: str, *, capital: float = 0.0) -> dict[str, int]:
return reconcile_pending_orders(
conn,
mode,
match_symbol_fn=_match_ctp_symbol,
sync_monitor_fn=_sync_monitor_from_ctp,
capital=capital,
list_positions_fn=_ctp_positions,
timeout_sec=get_pending_order_timeout_sec(get_setting),
)
def _build_active_orders(
conn,
*,
mode: str,
capital: float,
now_iso: str,
) -> list[dict]:
"""当前委托:CTP 已连接时读柜台;未连接时不展示本地 pending。"""
orders: list[dict] = []
seen_keys: set[str] = set()
connected = ctp_status(mode).get("connected")
if connected:
ctp_orders = trading_state.get_active_orders()
if not ctp_orders:
ctp_orders = _ctp_active_orders(mode)
for o in ctp_orders:
try:
row = _compose_ctp_open_order_row(
o, mode=mode, capital=capital, now_iso=now_iso,
)
if not row:
row = _compose_ctp_order_row_any(
o, mode=mode, capital=capital, now_iso=now_iso,
)
if row:
orders.append(row)
seen_keys.add(row.get("key") or "")
except Exception as exc:
logger.warning("compose ctp order row failed: %s", exc)
ctp_active_map: dict[str, dict] = {}
for o in ctp_orders or []:
for key in (o.get("order_id"), o.get("vt_order_id")):
if key:
ctp_active_map[str(key)] = o
for r in conn.execute(
"SELECT * FROM trade_order_monitors WHERE status='pending' ORDER BY id DESC"
).fetchall():
mon = dict(r)
try:
if not pending_monitor_has_live_order(
mon,
active_orders=ctp_active_map,
active_order_list=ctp_orders or [],
):
continue
prow = _compose_pending_row(
mon, mode=mode, capital=capital, now_iso=now_iso,
)
if prow and prow.get("key") not in seen_keys:
pk = f"{prow.get('symbol_code') or ''}:{prow.get('direction') or ''}"
dup = any(
(x.get("symbol_code") or "") + ":" + (x.get("direction") or "") == pk
and x.get("order_state") == "pending"
for x in orders
)
if not dup:
orders.append(prow)
except Exception as exc:
logger.warning("compose pending order row failed: %s", exc)
return orders
def _compose_ctp_order_row_any(
o: dict,
*,
mode: str,
capital: float,
now_iso: str,
) -> Optional[dict]:
"""CTP 任意未成交委托(含平仓)。"""
sym = _ctp_pos_to_ths_code(o) or (o.get("symbol") or "").strip()
direction = (o.get("direction") or "long").strip().lower()
lots = int(o.get("lots") or 0)
if not sym or lots <= 0:
return None
offset_u = (o.get("offset") or "").upper()
is_open = not offset_u or "OPEN" in offset_u
order_price = float(o.get("price") or 0)
pos_metrics = calc_position_metrics(
direction, order_price, order_price, order_price, lots, order_price, capital, sym,
)
label = "开仓委托" if is_open else "平仓委托"
timeout_sec = get_pending_order_timeout_sec(get_setting)
ex = o.get("exchange") or ""
pk = _canonical_position_key(sym, direction, ex)
return {
"key": f"{pk}:order:{o.get('order_id') or ''}",
"order_state": "pending",
"source": "ctp",
"source_label": label,
"sync_pending": False,
"monitor_id": None,
"order_id": o.get("order_id"),
"vt_order_id": o.get("vt_order_id") or o.get("order_id"),
"symbol_code": sym,
**_symbol_display_fields(sym),
"direction": direction,
"direction_label": "做多" if direction == "long" else "做空",
"lots": lots,
"entry_price": order_price,
"order_price": order_price,
"stop_loss": None,
"take_profit": None,
"open_time": now_iso,
"mark_price": order_price,
"current_price": order_price,
"margin": pos_metrics.get("margin"),
"margin_source": "estimate",
"position_pct": pos_metrics.get("position_pct"),
"float_pnl": None,
"can_close": False,
"close_allowed": False,
"can_cancel_order": is_trading_session(),
"cancel_allowed": is_trading_session(),
"pending_timeout_sec": timeout_sec if is_open else None,
"pending_timeout_min": max(1, timeout_sec // 60) if is_open else None,
"sl_order_active": False,
"tp_order_active": False,
"sl_monitoring": False,
"tp_monitoring": False,
"can_place_orders": False,
"pending_orders": [],
"trailing_be": False,
"trailing_r_locked": 0,
}
def _build_trading_live_rows(conn, *, fast: bool = False) -> list[dict]:
"""当前持仓:以 CTP 为准,SQLite 仅叠加 SL/TP 元数据。"""
from zoneinfo import ZoneInfo
tz = ZoneInfo("Asia/Shanghai")
now_iso = datetime.now(tz).strftime("%Y-%m-%dT%H:%M")
mode = get_trading_mode(get_setting)
capital = _capital(conn)
ensure_monitor_order_columns(conn)
monitor_by_pk = _monitors_by_position_key(conn)
ctp_list: list[dict] = []
if ctp_status(mode).get("connected"):
ctp_list = trading_state.get_positions()
if not ctp_list:
ctp_list = _ctp_positions(
mode, refresh_if_empty=not fast, refresh_margin=not fast,
)
rows: list[dict] = []
for p in ctp_list:
lots = int(p.get("lots") or 0)
if lots <= 0:
continue
pk = p.get("position_key") or _position_key_from_ctp(p)
mon = monitor_by_pk.get(pk)
if not mon:
for mk, mv in monitor_by_pk.items():
if (mv.get("direction") or "long") != (p.get("direction") or "long"):
continue
if _match_ctp_symbol(p.get("symbol") or "", mv.get("symbol") or ""):
mon = mv
break
ths = _ctp_pos_to_ths_code(p) or (p.get("symbol") or "")
if not mon:
mon = _find_pending_monitor(
conn, ths, p.get("direction") or "long",
)
if mon and not fast:
_sync_monitor_from_ctp(
conn, int(mon["id"]), mon.get("symbol") or ths,
mon.get("direction") or p.get("direction") or "long",
mode, ctp=p, capital=capital,
)
mon = _find_active_monitor(
conn, mon.get("symbol") or ths, mon.get("direction") or "long",
) or mon
try:
row = _compose_position_row(
conn, mon=mon, ctp=p, mode=mode, capital=capital,
now_iso=now_iso, fast=fast,
)
if row:
rows.append(row)
except Exception as exc:
logger.warning("compose ctp position row failed: %s", exc)
seen: set[str] = set()
deduped: list[dict] = []
for row in rows:
rk = row.get("key") or row.get("position_key") or ""
if rk in seen:
continue
seen.add(rk)
deduped.append(row)
return deduped
def _build_trading_live_payload(conn, *, fast: bool = False) -> dict:
from zoneinfo import ZoneInfo
tz = ZoneInfo("Asia/Shanghai")
now_iso = datetime.now(tz).strftime("%Y-%m-%dT%H:%M")
mode = get_trading_mode(get_setting)
ctp_st = ctp_status(mode)
capital = _capital(conn)
if ctp_st.get("connected") and (not fast or _has_pending_monitors(conn)):
_reconcile_pending(conn, mode, capital=capital)
if ctp_st.get("connected") and not fast:
_sync_trade_monitors_with_ctp(conn, mode)
rows = _build_trading_live_rows(conn, fast=fast)
active_orders = _build_active_orders(
conn, mode=mode, capital=capital, now_iso=now_iso,
)
rows = _apply_account_margin_to_rows(rows, mode, capital)
_persist_ctp_snapshot_to_monitors(conn, rows, mode)
pending_orders = _build_pending_orders(conn, mode)
risk = get_risk_status(conn, active_count=_effective_active_position_count(conn, mode))
return {
"ok": True,
"rows": rows,
"active_orders": active_orders,
"pending_orders": pending_orders,
"capital": capital,
"ctp_status": ctp_st,
"trading_mode_label": trading_mode_label(get_setting),
"risk_status": risk,
"trading_session": is_trading_session(),
"night_session": is_night_trading_session(),
"session_clock": trading_session_clock(),
"pending_order_timeout_min": get_pending_order_timeout_min(get_setting),
"sync_state": trading_state.sync_state,
"sync_label": trading_state.sync_label(),
}
def _refresh_trading_live_snapshot(*, fast: bool = False) -> dict:
mode = get_trading_mode(get_setting)
if ctp_status(mode).get("connected"):
if not fast:
try:
with _ctp_td_lock:
get_bridge().calibrate_trading_state()
except Exception as exc:
logger.debug("refresh calibrate: %s", exc)
for p in trading_state.get_positions() or _ctp_positions(mode, refresh_if_empty=False):
ths = _ctp_pos_to_ths_code(p)
if ths:
try:
get_bridge().subscribe_symbol(ths)
except Exception:
pass
conn = get_db()
try:
init_strategy_tables(conn)
payload = _build_trading_live_payload(conn, fast=fast)
conn.commit()
return payload
finally:
conn.close()
def _push_position_snapshot_async(*, fast: bool = True) -> None:
def _run() -> None:
try:
payload = _refresh_trading_live_snapshot(fast=fast)
position_hub.broadcast("positions", payload)
conn = get_db()
try:
rec = _recommend_payload(conn)
recommend_hub.broadcast("recommend", {"ok": True, **rec})
finally:
conn.close()
except Exception as exc:
logger.debug("push position snapshot: %s", exc)
threading.Thread(target=_run, daemon=True).start()
def _on_tick_sl_tp(exchange: str, symbol: str, price: float) -> None:
from sl_tp_guard import check_sl_tp_on_tick
from db_conn import DB_PATH, connect_db
mode = get_trading_mode(get_setting)
if not ctp_status(mode).get("connected"):
return
conn = connect_db(DB_PATH)
try:
_init_tables(conn)
capital = _capital(conn)
n = check_sl_tp_on_tick(
conn, mode, exchange, symbol, price,
capital=capital, notify_fn=send_wechat_msg,
be_tick_mult=get_trailing_be_tick_buffer(get_setting),
)
if n:
conn.commit()
_push_position_snapshot_async(fast=True)
except Exception as exc:
logger.debug("tick sl/tp: %s", exc)
finally:
conn.close()
def _bootstrap_trading_runtime() -> None:
"""进程启动:读 CTP 快照推送,事件驱动增量 + 定期全量校准。"""
set_position_refresh_callback(
lambda: _push_position_snapshot_async(fast=True)
)
set_tick_sl_tp_callback(_on_tick_sl_tp)
set_ctp_connected_callback(_on_ctp_connected)
def _warm() -> None:
try:
mode = get_trading_mode(get_setting)
if ctp_status(mode).get("connected"):
get_bridge().calibrate_trading_state()
payload = _refresh_trading_live_snapshot(fast=False)
position_hub.set_snapshot(payload)
position_hub.broadcast("positions", payload)
except Exception as exc:
logger.warning("bootstrap position snapshot: %s", exc)
threading.Thread(target=_warm, daemon=True, name="position-bootstrap").start()
try:
if is_ctp_auto_connect_enabled(get_setting):
from vnpy_bridge import ctp_start_connect
mode = get_trading_mode(get_setting)
ctp_start_connect(mode, force=False)
except Exception as exc:
logger.debug("bootstrap ctp connect: %s", exc)
def _on_ctp_connected(mode: str) -> None:
if mode != get_trading_mode(get_setting):
return
_schedule_recommend_refresh()
@app.route("/trade")
@login_required
def trade_page():
return redirect(url_for("positions"))
@app.route("/positions")
@login_required
def positions():
conn = get_db()
try:
init_strategy_tables(conn)
mode = get_trading_mode(get_setting)
ctp_st = ctp_status(mode)
capital = _capital(conn)
recommend_capital = _recommend_capital(conn)
risk = get_risk_status(conn, active_count=_effective_active_position_count(conn, mode))
ctp_acc = _ctp_account(mode) if ctp_st.get("connected") else {}
active_trend = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE status='active' ORDER BY id DESC LIMIT 1"
).fetchone()
monitor_count = conn.execute(
"SELECT COUNT(*) AS n FROM trade_order_monitors WHERE status='active'"
).fetchone()["n"]
roll_count = conn.execute(
"SELECT COUNT(*) AS n FROM roll_groups WHERE status='active'"
).fetchone()["n"]
conn.commit()
sizing = get_sizing_mode(get_setting)
max_pct = get_max_margin_pct(get_setting)
rec_cache = _recommend_payload(conn)
if rec_cache.get("needs_refresh"):
_schedule_recommend_refresh()
ctp_connected = is_ctp_connected(get_setting)
margin_rec = small_account_margin_recommendations()
return render_template(
"trade.html",
trading_mode=mode,
trading_mode_label=trading_mode_label(get_setting),
capital=capital,
recommend_capital=recommend_capital,
risk_status=risk,
ctp_status=ctp_st,
ctp_account=ctp_acc,
active_trend=dict(active_trend) if active_trend else None,
monitor_count=monitor_count,
roll_count=roll_count,
sizing_mode=sizing,
sizing_mode_label=_sizing_mode_label(sizing),
fixed_lots=get_fixed_lots(get_setting),
fixed_amount=get_fixed_amount(get_setting),
risk_percent=get_risk_percent(get_setting),
max_margin_pct=get_max_margin_pct(get_setting),
pending_order_timeout_min=get_pending_order_timeout_min(get_setting),
ctp_auto_connect=is_ctp_auto_connect_enabled(get_setting),
recommend_rows=rec_cache.get("rows") or [],
recommend_updated_at=rec_cache.get("updated_at"),
night_session=is_night_trading_session(),
small_account_scope=should_apply_small_account_scope(
capital, ctp_connected=ctp_connected,
),
small_account_scope_hint=small_account_scope_hint(ctp_connected=ctp_connected),
small_account_margin_rec=margin_rec if should_apply_small_account_scope(
capital, ctp_connected=ctp_connected,
) else None,
session_clock=trading_session_clock(),
roll_max_margin_pct=get_roll_max_margin_pct(get_setting),
product_categories=PRODUCT_CATEGORIES,
)
finally:
conn.close()
@app.route("/recommend")
@login_required
def recommend_page():
return redirect(url_for("positions") + "#recommend")
@app.route("/api/trading/live")
@login_required
def api_trading_live():
conn = get_db()
try:
init_strategy_tables(conn)
payload = _build_trading_live_payload(conn, fast=True)
conn.commit()
position_hub.set_snapshot(payload)
return jsonify(payload)
finally:
conn.close()
@app.route("/api/trading/stream")
@login_required
def api_trading_stream():
from queue import Empty
def generate():
q = position_hub.subscribe()
try:
snap = position_hub.get_snapshot()
if snap:
yield sse_format("positions", snap)
else:
payload = _refresh_trading_live_snapshot(fast=True)
position_hub.set_snapshot(payload)
yield sse_format("positions", payload)
while True:
try:
msg = q.get(timeout=25)
yield sse_format(msg["event"], msg["data"])
except Empty:
yield ": heartbeat\n\n"
finally:
position_hub.unsubscribe(q)
return Response(
generate(),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
@app.route("/api/trading/monitor/upsert", methods=["POST"])
@login_required
def api_trading_monitor_upsert():
"""为已有持仓补充/更新本地止盈止损监控。"""
d = request.get_json(silent=True) or {}
sym = (d.get("symbol_code") or d.get("symbol") or "").strip()
direction = (d.get("direction") or "long").strip().lower()
try:
lots = max(1, int(d.get("lots") or 1))
entry = float(d.get("entry_price") or d.get("entry") or 0)
sl = float(d["stop_loss"]) if d.get("stop_loss") not in (None, "") else None
tp = float(d["take_profit"]) if d.get("take_profit") not in (None, "") else None
except (TypeError, ValueError, KeyError):
return jsonify({"ok": False, "error": "参数无效"}), 400
if not sym:
return jsonify({"ok": False, "error": "缺少品种代码"}), 400
if sl is None and tp is None:
return jsonify({"ok": False, "error": "请至少填写止损或止盈"}), 400
trailing_on = bool(d.get("trailing_be"))
if trailing_on and sl is None:
return jsonify({"ok": False, "error": "移动保本须填写止损价"}), 400
if trailing_on:
tp = None
mode = get_trading_mode(get_setting)
conn = get_db()
try:
init_strategy_tables(conn)
mon = _find_active_monitor(conn, sym, direction)
has_pos = bool(mon)
ths_sym = sym
if ctp_status(mode).get("connected"):
for p in _ctp_positions(mode, refresh_if_empty=False):
if int(p.get("lots") or 0) <= 0:
continue
if (p.get("direction") or "long") != direction:
continue
if _match_ctp_symbol(p.get("symbol") or "", sym):
has_pos = True
lots = int(p.get("lots") or lots)
entry = float(p.get("avg_price") or entry or 0)
ths_sym = _ctp_pos_to_ths_code(p) or sym
break
if not has_pos:
return jsonify({"ok": False, "error": "未找到对应持仓"}), 400
trailing_be = 1 if trailing_on else (
int(mon.get("trailing_be") or 0) if mon else 0
)
mid = _upsert_open_monitor(
conn,
sym=ths_sym,
direction=direction,
lots=lots,
price=entry,
sl=sl,
tp=tp,
trailing_be=trailing_be,
)
if trailing_on and sl is not None:
conn.execute(
"""UPDATE trade_order_monitors SET
take_profit=NULL, initial_stop_loss=?, trailing_r_locked=0
WHERE id=?""",
(sl, mid),
)
conn.commit()
_push_position_snapshot_async(fast=False)
return jsonify({
"ok": True,
"monitor_id": mid,
"message": "止盈止损已保存,程序本地监控",
})
finally:
conn.close()
@app.route("/api/trading/monitor/place-orders", methods=["POST"])
@login_required
def api_trading_monitor_place_orders():
"""本地监控模式:清理旧版柜台挂单,不再向交易所挂止盈止损。"""
d = request.get_json(silent=True) or {}
try:
monitor_id = int(d.get("monitor_id") or 0)
except (TypeError, ValueError):
monitor_id = 0
conn = get_db()
try:
init_strategy_tables(conn)
ensure_monitor_order_columns(conn)
mode = get_trading_mode(get_setting)
if not ctp_status(mode).get("connected"):
return jsonify({"ok": False, "error": "请先连接 CTP"}), 400
mon = None
if monitor_id > 0:
row = conn.execute(
"SELECT * FROM trade_order_monitors WHERE id=? AND status='active'",
(monitor_id,),
).fetchone()
mon = dict(row) if row else None
if not mon:
sym = (d.get("symbol_code") or "").strip()
direction = (d.get("direction") or "long").strip().lower()
for r in conn.execute(
"SELECT * FROM trade_order_monitors WHERE status='active'"
).fetchall():
row = dict(r)
if row.get("direction") != direction:
continue
if _match_ctp_symbol(sym, row.get("symbol") or ""):
mon = row
break
if not mon:
return jsonify({"ok": False, "error": "未找到有效监控快照"}), 404
result = place_monitor_exit_orders(
conn, mon, mode=mode, force=bool(d.get("force")),
)
if not result.get("ok"):
return jsonify(result), 400
return jsonify(result)
finally:
conn.close()
@app.route("/api/trading/monitor/dismiss", methods=["POST"])
@login_required
def api_trading_monitor_dismiss():
d = request.get_json(silent=True) or {}
try:
monitor_id = int(d.get("monitor_id") or 0)
except (TypeError, ValueError):
monitor_id = 0
if monitor_id <= 0:
return jsonify({"ok": False, "error": "无效的监控记录"}), 400
conn = get_db()
try:
init_strategy_tables(conn)
mode = get_trading_mode(get_setting)
row = conn.execute(
"SELECT * FROM trade_order_monitors WHERE id=? AND status IN ('active', 'pending')",
(monitor_id,),
).fetchone()
if not row:
return jsonify({"ok": False, "error": "记录不存在或已关闭"}), 404
mon = dict(row)
if (mon.get("status") or "").strip().lower() == "pending":
if not is_trading_session():
return jsonify({"ok": False, "error": "不在交易时间段,无法撤单"}), 403
ok, msg = cancel_pending_monitor(conn, mon, mode)
_push_position_snapshot_async(fast=False)
return jsonify({"ok": ok, "message": msg})
conn.execute(
"UPDATE trade_order_monitors SET status='closed' WHERE id=?",
(monitor_id,),
)
conn.commit()
_push_position_snapshot_async(fast=False)
return jsonify({"ok": True, "message": "已取消本地止盈止损监控"})
finally:
conn.close()
@app.route("/api/trading/monitor/cancel-open", methods=["POST"])
@login_required
def api_trading_monitor_cancel_open():
"""撤销 pending 开仓委托(柜台撤单 + 关闭本地记录)。"""
d = request.get_json(silent=True) or {}
try:
monitor_id = int(d.get("monitor_id") or 0)
except (TypeError, ValueError):
monitor_id = 0
if monitor_id <= 0:
return jsonify({"ok": False, "error": "无效的委托记录"}), 400
conn = get_db()
try:
init_strategy_tables(conn)
mode = get_trading_mode(get_setting)
if not ctp_status(mode).get("connected"):
return jsonify({"ok": False, "error": "请先连接 CTP"}), 400
if not is_trading_session():
return jsonify({"ok": False, "error": "不在交易时间段,无法撤单"}), 403
row = conn.execute(
"SELECT * FROM trade_order_monitors WHERE id=? AND status='pending'",
(monitor_id,),
).fetchone()
if not row:
return jsonify({"ok": False, "error": "未找到挂单中的开仓委托"}), 404
ok, msg = cancel_pending_monitor(conn, dict(row), mode)
_push_position_snapshot_async(fast=False)
return jsonify({"ok": ok, "message": msg})
finally:
conn.close()
@app.route("/api/trading/order/cancel", methods=["POST"])
@login_required
def api_trading_order_cancel():
"""撤销柜台未成交委托(按 vt_order_id)。"""
d = request.get_json(silent=True) or {}
order_id = (d.get("order_id") or "").strip()
if not order_id:
return jsonify({"ok": False, "error": "无效的委托号"}), 400
mode = get_trading_mode(get_setting)
if not ctp_status(mode).get("connected"):
return jsonify({"ok": False, "error": "请先连接 CTP"}), 400
if not is_trading_session():
return jsonify({"ok": False, "error": "不在交易时间段,无法撤单"}), 403
ok = ctp_cancel_order(mode, order_id)
_push_position_snapshot_async(fast=False)
if not ok:
return jsonify({"ok": False, "error": "撤单失败,委托可能已成交或已撤销"}), 400
return jsonify({"ok": True, "message": "撤单已提交"})
@app.route("/api/trading/close", methods=["POST"])
@login_required
def api_trading_close():
d = request.get_json(silent=True) or {}
source = (d.get("source") or "").strip()
conn = get_db()
init_strategy_tables(conn)
mode = get_trading_mode(get_setting)
if not ctp_status(mode).get("connected") and source in ("ctp", "program"):
conn.close()
return jsonify({"ok": False, "error": "请先连接 CTP"}), 400
sym = (d.get("symbol_code") or d.get("symbol") or "").strip()
direction = (d.get("direction") or "long").strip().lower()
try:
lots = max(1, int(d.get("lots") or 1))
price = float(d.get("price") or 0)
except (TypeError, ValueError):
conn.close()
return jsonify({"ok": False, "error": "参数无效"}), 400
if not sym or price <= 0:
conn.close()
return jsonify({"ok": False, "error": "品种或价格无效"}), 400
offset = "close_long" if direction == "long" else "close_short"
capital = _capital(conn)
mon = None
mid = int(d.get("monitor_id") or 0)
if mid:
row = conn.execute(
"SELECT * FROM trade_order_monitors WHERE id=? AND status='active'",
(mid,),
).fetchone()
if row:
mon = dict(row)
if not mon:
for r in conn.execute(
"SELECT * FROM trade_order_monitors WHERE status='active'"
).fetchall():
row = dict(r)
if row.get("direction") != direction:
continue
if _match_ctp_symbol(sym, row.get("symbol") or ""):
mon = row
mid = int(row["id"])
break
entry = float(mon.get("entry_price") or 0) if mon else 0.0
if entry <= 0:
for p in _ctp_positions(mode):
if int(p.get("lots") or 0) <= 0:
continue
if (p.get("direction") or "long") != direction:
continue
if _match_ctp_symbol(p.get("symbol") or "", sym):
entry = float(p.get("avg_price") or price)
break
try:
execute_order(
conn, mode=mode, offset=offset, symbol=sym, direction=direction,
lots=lots, price=price, settings=_settings_dict(),
order_type="market",
)
if not ctp_status(mode).get("connected"):
write_manual_close_trade_log(
conn,
mon,
symbol=sym,
direction=direction,
lots=lots,
close_price=price,
entry_price=entry or price,
trading_mode=mode,
capital=capital,
stop_loss=float(mon["stop_loss"]) if mon and mon.get("stop_loss") is not None else None,
take_profit=float(mon["take_profit"]) if mon and mon.get("take_profit") is not None else None,
open_time=(mon.get("open_time") or "") if mon else "",
symbol_name=(mon.get("symbol_name") or "") if mon else "",
market_code=(mon.get("market_code") or "") if mon else "",
)
_close_all_monitors_for_sym_dir(conn, sym, direction)
conn.commit()
try:
from ctp_trade_sync import sync_trade_logs_from_ctp
sync_trade_logs_from_ctp(conn, mode, capital=capital, trading_mode=mode)
conn.commit()
except Exception as exc:
logger.debug("sync trades after close: %s", exc)
conn.close()
_push_position_snapshot_async()
return jsonify({"ok": True, "message": "已平仓;交易记录将按柜台成交同步"})
except ValueError as exc:
conn.close()
return jsonify({"ok": False, "error": str(exc)}), 400
@app.route("/strategy")
@login_required
@_nav("strategy")
def strategy_page():
conn = get_db()
init_strategy_tables(conn)
capital = _capital(conn)
active_trend = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE status='active' ORDER BY id DESC LIMIT 1"
).fetchone()
monitors = conn.execute(
"SELECT * FROM trade_order_monitors WHERE status='active' ORDER BY id DESC"
).fetchall()
roll_groups = conn.execute(
"SELECT * FROM roll_groups WHERE status='active' ORDER BY id DESC"
).fetchall()
active_trend_row = dict(active_trend) if active_trend else None
if active_trend_row:
active_trend_row["period_label"] = trend_period_label(active_trend_row.get("period") or "15m")
conn.close()
return render_template(
"strategy.html",
capital=capital,
risk_percent=get_risk_percent(get_setting),
sizing_mode=get_sizing_mode(get_setting),
active_trend=active_trend_row,
monitors=[dict(m) for m in monitors],
roll_groups=[dict(g) for g in roll_groups],
trend_periods=trend_strategy_periods(),
)
@app.route("/strategy/records")
@login_required
def strategy_records_page():
conn = get_db()
init_strategy_tables(conn)
trend, roll = list_snapshots(conn)
conn.close()
return render_template("strategy_records.html", trend_rows=trend, roll_rows=roll)
@app.route("/api/trade/quote")
@login_required
def api_trade_quote():
sym = (request.args.get("symbol") or "").strip()
lots = request.args.get("lots") or "1"
if not sym:
return jsonify({"ok": False, "error": "缺少品种"}), 400
codes = ths_to_codes(sym)
price = fetch_price(sym, codes.get("market_code", "") if codes else "", codes.get("sina_code", "") if codes else "")
try:
lots_f = max(1, int(float(lots)))
except (TypeError, ValueError):
lots_f = 1
mode = get_trading_mode(get_setting)
metrics = calc_order_tick_metrics(sym, lots_f, price, trading_mode=mode)
spec = get_contract_spec(sym)
name = codes.get("name", sym) if codes else sym
pos_long = pos_short = 0
ctp_st = ctp_status(mode)
if ctp_st.get("connected"):
for p in _ctp_positions(mode):
if not _match_ctp_symbol(p.get("symbol", ""), sym):
continue
if p["direction"] == "long":
pos_long = int(p["lots"])
else:
pos_short = int(p["lots"])
max_open = int(_capital(get_db()) / (metrics["margin_per_lot"] or 1)) if metrics.get("margin_per_lot") else 0
return jsonify({
"ok": True,
"symbol": sym,
"name": name,
"price": price,
"lots": lots_f,
"metrics": metrics,
"exchange": codes.get("exchange", "") if codes else "",
"pos_long": pos_long,
"pos_short": pos_short,
"max_open_long": max_open,
"max_open_short": max_open,
"footer_text": (
f"*{name} 每手{spec['mult']}吨/点 最小变动{metrics['tick_size']} "
f"每跳{metrics['tick_value_per_lot']}元/手×{lots_f}={metrics['tick_value_total']}"
f"精度{metrics['price_precision']}位小数"
),
})
@app.route("/api/trade/preview", methods=["POST"])
@login_required
def api_trade_preview():
d = request.get_json(silent=True) or {}
sym = (d.get("symbol") or "").strip()
direction = (d.get("direction") or "long").strip().lower()
try:
entry = float(d.get("entry") or d.get("price") or 0)
sl = float(d.get("stop_loss") or 0)
tp = float(d.get("take_profit") or 0)
except (TypeError, ValueError):
return jsonify({"ok": False, "error": "价格参数无效"}), 400
conn = get_db()
capital = _capital(conn)
conn.close()
sizing = get_sizing_mode(get_setting)
margin_pct = get_max_margin_pct(get_setting)
sizing_info = {}
if sizing == MODE_AMOUNT:
lots, err, sizing_info = calc_lots_by_amount(
entry, sl, direction, get_fixed_amount(get_setting), sym,
capital=capital, max_margin_pct=margin_pct,
trading_mode=get_trading_mode(get_setting),
)
if err:
return jsonify({"ok": False, "error": err}), 400
elif sizing == MODE_FIXED:
lots = get_fixed_lots(get_setting)
else:
try:
lots = max(1, int(d.get("lots") or 1))
except (TypeError, ValueError):
lots = 1
metrics = calc_position_metrics(direction, entry, sl, tp, lots, entry, capital, sym)
tick = calc_order_tick_metrics(
sym, lots, entry, direction=direction, trading_mode=get_trading_mode(get_setting),
)
return jsonify({
"ok": True, "lots": lots, "sizing_mode": sizing,
"metrics": metrics, "tick": tick, "capital": capital,
"sizing_info": sizing_info,
})
@app.route("/api/trade/order", methods=["POST"])
@login_required
def api_trade_order():
d = request.get_json(silent=True) or {}
sym = (d.get("symbol") or "").strip()
offset = (d.get("offset") or "open").strip().lower()
direction = (d.get("direction") or "long").strip().lower()
try:
lots = max(1, int(d.get("lots") or 1))
price = float(d.get("price") or 0)
except (TypeError, ValueError):
return jsonify({"ok": False, "error": "手数或价格无效"}), 400
order_type = (d.get("order_type") or d.get("price_type") or "limit").strip().lower()
if order_type == "market" and price <= 0:
codes = ths_to_codes(sym)
price = fetch_price(
sym,
codes.get("market_code", "") if codes else "",
codes.get("sina_code", "") if codes else "",
) or 0
if not sym or price <= 0:
return jsonify({"ok": False, "error": "品种或价格无效"}), 400
conn = get_db()
init_strategy_tables(conn)
mode = get_trading_mode(get_setting)
if offset.startswith("open"):
_sync_trade_monitors_with_ctp(conn, mode)
if not is_trading_session():
conn.close()
return jsonify({"ok": False, "error": "不在交易时间段"}), 403
if d.get("trailing_be") and not d.get("stop_loss"):
conn.close()
return jsonify({"ok": False, "error": "开启移动保本须填写止损价"}), 400
err = assert_can_open(conn, active_count=_effective_active_position_count(conn, mode))
if err:
conn.close()
return jsonify({"ok": False, "error": err}), 403
scope_err = assert_product_allowed_for_capital(
sym, _capital(conn), ctp_connected=is_ctp_connected(get_setting),
)
if scope_err:
conn.close()
return jsonify({"ok": False, "error": scope_err}), 403
ctp_st = ctp_status(mode)
if not ctp_st.get("connected"):
conn.close()
if get_bridge().connect_in_progress():
return jsonify({"ok": False, "error": "CTP 连接中,请稍候再下单"}), 400
return jsonify({"ok": False, "error": "请先连接 CTP"}), 400
sizing = get_sizing_mode(get_setting)
if offset.startswith("open") and sizing == MODE_AMOUNT:
sl = float(d.get("stop_loss") or 0)
if sl <= 0:
conn.close()
return jsonify({"ok": False, "error": "固定金额模式须填写止损价"}), 400
lots_calc, err, _sizing_info = calc_lots_by_amount(
price, sl, direction, get_fixed_amount(get_setting), sym,
capital=_capital(conn), max_margin_pct=get_max_margin_pct(get_setting),
trading_mode=mode,
)
if err:
conn.close()
return jsonify({"ok": False, "error": err}), 400
lots = lots_calc or lots
elif offset.startswith("open") and sizing == MODE_FIXED:
lots = get_fixed_lots(get_setting)
margin_pct = get_max_margin_pct(get_setting)
usage = calc_margin_usage_pct(
_ctp_positions(mode),
_capital(conn),
extra_symbol=sym if offset.startswith("open") else "",
extra_lots=lots if offset.startswith("open") else 0,
extra_price=price if offset.startswith("open") else 0,
extra_direction=direction if offset.startswith("open") else "long",
trading_mode=mode,
)
if offset.startswith("open") and usage > margin_pct:
conn.close()
return jsonify({
"ok": False,
"error": f"保证金占用 {usage:.1f}% 超过上限 {margin_pct:g}%(可在系统设置修改)",
}), 403
if lots > DEFAULT_MAX_ORDER_LOTS:
conn.close()
return jsonify({
"ok": False,
"error": f"单笔手数 {lots} 超过上限 {DEFAULT_MAX_ORDER_LOTS},请加大止损距离或改固定手数",
}), 400
try:
result = execute_order(
conn,
mode=mode,
offset=offset,
symbol=sym,
direction=direction,
lots=lots,
price=price,
settings=_settings_dict(),
order_type=order_type,
)
if offset.startswith("open") and d.get("trailing_be") and not d.get("stop_loss"):
conn.close()
return jsonify({"ok": False, "error": "开启移动保本须填写止损价"}), 400
if offset.startswith("open"):
from zoneinfo import ZoneInfo
sl = d.get("stop_loss")
trailing_be = 1 if d.get("trailing_be") else 0
tp = None if trailing_be else d.get("take_profit")
open_ts = datetime.now(ZoneInfo("Asia/Shanghai")).strftime("%Y-%m-%d %H:%M:%S")
vt_order_id = str(result.get("order_id") or "")
mid = _upsert_open_monitor(
conn,
sym=sym,
direction=direction,
lots=lots,
price=price,
sl=sl,
tp=tp,
trailing_be=trailing_be,
open_time=open_ts,
monitor_type="manual",
status="pending",
vt_order_id=vt_order_id or None,
order_price=price,
)
conn.commit()
try:
with _ctp_td_lock:
get_bridge().refresh_positions()
except Exception:
pass
_reconcile_pending(conn, mode, capital=_capital(conn))
st_row = conn.execute(
"SELECT status FROM trade_order_monitors WHERE id=?", (mid,),
).fetchone()
filled = st_row and (st_row["status"] or "").strip().lower() == "active"
rejected = st_row and (st_row["status"] or "").strip().lower() == "closed"
if rejected:
conn.commit()
conn.close()
_push_position_snapshot_async(fast=False)
return jsonify({
"ok": False,
"error": "委托已被柜台拒绝或撤销(请确认合约状态与交易时段)",
"lots": lots,
"filled": False,
}), 400
if not filled:
try:
get_bridge().refresh_positions()
except Exception:
pass
_reconcile_pending(conn, mode, capital=_capital(conn))
st_row = conn.execute(
"SELECT status FROM trade_order_monitors WHERE id=?", (mid,),
).fetchone()
filled = st_row and (st_row["status"] or "").strip().lower() == "active"
rejected = st_row and (st_row["status"] or "").strip().lower() == "closed"
if rejected:
conn.commit()
conn.close()
_push_position_snapshot_async(fast=False)
return jsonify({
"ok": False,
"error": "委托已被柜台拒绝或撤销(请确认合约状态与交易时段)",
"lots": lots,
"filled": False,
}), 400
if filled:
_sync_monitor_from_ctp(
conn, mid, sym, direction, mode, capital=_capital(conn),
)
mon_row = conn.execute(
"SELECT * FROM trade_order_monitors WHERE id=?", (mid,),
).fetchone()
if mon_row and (sl or tp):
try:
ensure_monitor_order_columns(conn)
cancel_monitor_exit_orders(conn, dict(mon_row), mode=mode)
except Exception as exc:
logger.warning("清理旧版止盈止损挂单失败: %s", exc)
conn.commit()
_push_position_snapshot_async(fast=False)
msg = (
f"开仓成功 · {lots}"
if filled
else (
f"委托已提交 · {lots} 手挂单中"
f"{get_pending_order_timeout_sec(get_setting) // 60} 分钟未成交自动撤单)"
)
)
conn.commit()
if offset.startswith("open"):
from db_conn import DB_PATH
from ai_worker import schedule_ai_event_analysis
from trade_notify import notify_manual_open_filled
if filled:
open_sl = float(d.get("stop_loss") or 0) if d.get("stop_loss") else None
open_tp = None if d.get("trailing_be") else d.get("take_profit")
if open_tp is not None:
try:
open_tp = float(open_tp)
except (TypeError, ValueError):
open_tp = None
codes = ths_to_codes(sym) or {}
if open_sl and open_sl > 0:
notify_manual_open_filled(
send_wechat=send_wechat_msg,
get_setting=get_setting,
mode_label=trading_mode_label(get_setting),
sym=sym,
symbol_name=codes.get("name") or sym,
direction=direction,
entry=price,
sl=open_sl,
tp=open_tp,
lots=lots,
capital=_capital(conn),
order_id=str(result.get("order_id") or ""),
trailing_be=bool(d.get("trailing_be")),
be_tick_buffer=get_trailing_be_tick_buffer(get_setting),
schedule_ai_fn=schedule_ai_event_analysis,
db_path=DB_PATH,
)
else:
send_wechat_msg(
f"{trading_mode_label(get_setting)} 开仓 {sym} {direction} {lots}手 @{price}"
)
elif not filled:
send_wechat_msg(
f"委托已提交 · {sym} {direction} {lots}手挂单中"
f"{get_pending_order_timeout_sec(get_setting) // 60} 分钟未成交自动撤单)"
)
elif not offset.startswith("open"):
send_wechat_msg(
f"{trading_mode_label(get_setting)} {offset} {sym} {direction} {lots}手 @{price}"
)
conn.close()
_push_position_snapshot_async(fast=False)
return jsonify({
"ok": True,
"result": result,
"lots": lots,
"message": msg if offset.startswith("open") else "委托已提交柜台",
"filled": filled if offset.startswith("open") else None,
})
except (ValueError, RuntimeError) as exc:
conn.close()
return jsonify({"ok": False, "error": str(exc)}), 400
except Exception as exc:
conn.close()
return jsonify({"ok": False, "error": str(exc)}), 500
@app.route("/api/ctp/connect", methods=["POST"])
@login_required
def api_ctp_connect():
from vnpy_bridge import ctp_start_connect
from ctp_settings import CTP_DISABLED_HINT
if not is_ctp_auto_connect_enabled(get_setting):
mode = get_trading_mode(get_setting)
st = ctp_status(mode)
return jsonify({
"ok": False,
"disabled": True,
"error": CTP_DISABLED_HINT,
"status": st,
}), 400
mode = get_trading_mode(get_setting)
body = request.get_json(silent=True) or {}
force = bool(body.get("force"))
info = ctp_start_connect(mode, force=force)
st = info.get("status") or ctp_status(mode)
acc = _ctp_account(mode) if st.get("connected") else {}
if st.get("connected"):
return jsonify({"ok": True, "status": st, "account": acc})
if info.get("connecting") or info.get("started"):
return jsonify({
"ok": True,
"connecting": True,
"status": st,
"account": acc,
})
if info.get("cooldown"):
return jsonify({
"ok": False,
"cooldown": True,
"error": st.get("last_error") or "CTP 登录冷却中",
"status": st,
"account": acc,
}), 400
return jsonify({
"ok": False,
"error": st.get("last_error") or "CTP 连接未启动",
"status": st,
"account": acc,
}), 400
@app.route("/api/ctp/status")
@login_required
def api_ctp_status():
mode = get_trading_mode(get_setting)
st = ctp_status(mode)
acc = {}
if st.get("connected"):
try:
acc = _ctp_account(mode)
except Exception:
acc = {}
return jsonify({"ok": True, "status": st, "account": acc})
@app.route("/api/account_snapshot")
@login_required
def api_account_snapshot():
conn = get_db()
try:
init_strategy_tables(conn)
mode = get_trading_mode(get_setting)
ctp_st = ctp_status(mode)
capital = _capital(conn)
risk = get_risk_status(conn, active_count=_effective_active_position_count(conn, mode))
conn.commit()
ctp_acc = _ctp_account(mode) if ctp_st.get("connected") else {}
positions = _ctp_positions(mode) if ctp_st.get("connected") else []
return jsonify({
"capital": capital,
"trading_mode": mode,
"trading_mode_label": trading_mode_label(get_setting),
"sizing_mode": get_sizing_mode(get_setting),
"risk_status": risk,
"ctp_status": ctp_st,
"ctp_account": ctp_acc,
"positions": positions,
})
finally:
conn.close()
@app.route("/api/recommend/list")
@login_required
def api_recommend_list():
"""只读数据库缓存,不在请求时拉行情。"""
conn = get_db()
try:
payload = _recommend_payload(conn)
return jsonify({"ok": True, **payload})
finally:
conn.close()
@app.route("/api/recommend/stream")
@login_required
def api_recommend_stream():
from queue import Empty
def generate():
q = recommend_hub.subscribe()
try:
conn = get_db()
try:
payload = _recommend_payload(conn)
finally:
conn.close()
yield sse_format("recommend", {"ok": True, **payload})
while True:
try:
msg = q.get(timeout=25)
yield sse_format(msg["event"], msg["data"])
except Empty:
yield ": heartbeat\n\n"
finally:
recommend_hub.unsubscribe(q)
return Response(
stream_with_context(generate()),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
@app.route("/api/recommend/refresh", methods=["POST"])
@login_required
def api_recommend_refresh():
"""手动触发一次后台刷新(仍写入数据库)。"""
conn = get_db()
try:
init_strategy_tables(conn)
capital = _recommend_capital(conn)
mode = get_trading_mode(get_setting)
rows = refresh_recommend_cache(
conn, capital, _main_quote, trading_mode=mode,
max_margin_pct=get_max_margin_pct(get_setting),
)
max_pct = get_max_margin_pct(get_setting)
payload = _recommend_payload(conn)
recommend_hub.broadcast("recommend", {"ok": True, **payload})
return jsonify({"ok": True, "count": len(rows), **payload})
finally:
conn.close()
@app.route("/api/strategy/trend/preview", methods=["POST"])
@login_required
def api_trend_preview():
d = request.get_json(silent=True) or {}
sym = (d.get("symbol") or "").strip()
conn = get_db()
if conn.execute("SELECT id FROM trend_pullback_plans WHERE status='active'").fetchone():
conn.close()
return jsonify({"ok": False, "error": "已有运行中趋势计划"}), 400
capital = _capital(conn)
codes = ths_to_codes(sym)
price = fetch_price(sym, codes.get("market_code", "") if codes else "", codes.get("sina_code", "") if codes else "")
conn.close()
if not price:
return jsonify({"ok": False, "error": "无法获取现价"}), 400
plan, err = compute_trend_plan_futures(
direction=d.get("direction") or "long",
stop_loss=float(d.get("stop_loss") or 0),
add_upper=float(d.get("add_upper") or 0),
take_profit=float(d.get("take_profit") or 0),
risk_percent=float(d.get("risk_percent") or get_risk_percent(get_setting)),
capital=capital,
live_price=price,
ths_code=sym,
dca_legs=int(d.get("dca_legs") or 5),
)
if err:
return jsonify({"ok": False, "error": err}), 400
period = normalize_trend_period(d.get("period"))
sym_name = (d.get("symbol_name") or "").strip()
if not sym_name and codes:
sym_name = codes.get("name") or sym
plan = enrich_trend_plan_preview(
plan, symbol=sym, symbol_name=sym_name, period=period,
)
return jsonify({"ok": True, "plan": plan})
@app.route("/api/strategy/trend/execute", methods=["POST"])
@login_required
def api_trend_execute():
d = request.get_json(silent=True) or {}
sym = (d.get("symbol") or "").strip()
conn = get_db()
init_strategy_tables(conn)
err = assert_can_open(conn)
if err:
conn.close()
return jsonify({"ok": False, "error": err}), 403
capital = _capital(conn)
scope_err = assert_product_allowed_for_capital(
sym, capital, ctp_connected=is_ctp_connected(get_setting),
)
if scope_err:
conn.close()
return jsonify({"ok": False, "error": scope_err}), 403
codes = ths_to_codes(sym)
price = fetch_price(sym, codes.get("market_code", "") if codes else "", codes.get("sina_code", "") if codes else "")
plan, perr = compute_trend_plan_futures(
direction=d.get("direction") or "long",
stop_loss=float(d.get("stop_loss") or 0),
add_upper=float(d.get("add_upper") or 0),
take_profit=float(d.get("take_profit") or 0),
risk_percent=float(d.get("risk_percent") or get_risk_percent(get_setting)),
capital=capital,
live_price=price or float(d.get("live_price") or 0),
ths_code=sym,
)
if perr:
conn.close()
return jsonify({"ok": False, "error": perr}), 400
period = normalize_trend_period(d.get("period"))
sym_name = (d.get("symbol_name") or "").strip()
if not sym_name and codes:
sym_name = codes.get("name") or sym
plan = enrich_trend_plan_preview(
plan, symbol=sym, symbol_name=sym_name, period=period,
)
mode = get_trading_mode(get_setting)
try:
execute_order(
conn, mode=mode, offset="open", symbol=sym,
direction=plan["direction"], lots=plan["first_lots"], price=price, settings=_settings_dict(),
)
except ValueError as exc:
conn.close()
return jsonify({"ok": False, "error": str(exc)}), 400
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
cur = conn.execute(
"""INSERT INTO trend_pullback_plans (
status, symbol, symbol_name, direction, stop_loss, add_upper, take_profit,
risk_percent, capital_snapshot, plan_margin, target_lots, first_lots, remainder_lots,
dca_legs, leg_amounts_json, grid_prices_json, first_order_done, avg_entry_price,
lots_open, opened_at, period
) VALUES ('active',?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,1,?,?,?,?)""",
(
sym, sym_name or (codes.get("name", sym) if codes else sym), plan["direction"],
plan["stop_loss"], plan["add_upper"], plan["take_profit"],
plan["risk_percent"], plan["capital_snapshot"], plan["plan_margin"],
plan["target_lots"], plan["first_lots"], plan["remainder_lots"],
plan["dca_legs"], plan["leg_amounts_json"], plan["grid_prices_json"],
price, plan["first_lots"], now, plan["period"],
),
)
plan_id = cur.lastrowid
conn.commit()
conn.close()
send_wechat_msg(f"趋势回调首仓 {sym} {plan['first_lots']}")
return jsonify({"ok": True, "plan_id": plan_id, "plan": plan})
def _apply_roll_margin_cap(
preview: dict,
*,
conn,
mode: str,
mon: dict,
capital: float,
) -> tuple[dict, Optional[str]]:
"""滚仓:风险算手数后再按滚仓保证金上限收紧。"""
if not preview:
return preview, "预览无效"
sym = mon["symbol"]
direction = (mon.get("direction") or "long").strip().lower()
price = float(preview.get("add_price") or 0)
qty_existing = float(mon.get("lots") or 0)
entry_existing = float(mon.get("entry_price") or 0)
mult = int(get_contract_spec(sym).get("mult") or 1)
roll_pct = get_roll_max_margin_pct(get_setting)
add_lots = int(preview.get("add_lots") or 0)
positions = _ctp_positions(mode, refresh_if_empty=False)
capped, usage = cap_lots_for_margin_budget(
positions, capital, sym, direction, price, add_lots, roll_pct, trading_mode=mode,
)
if capped < 1:
return preview, f"滚仓后保证金占用将超过上限 {roll_pct:g}%"
out = dict(preview)
if capped < add_lots:
out["add_lots"] = capped
out["qty_after"] = int(qty_existing + capped)
out["avg_entry_after"] = round(
avg_entry_after_add(qty_existing, entry_existing, capped, price), 4,
)
sl = float(out.get("new_stop_loss") or 0)
tp = float(out.get("initial_take_profit") or 0)
new_avg = float(out["avg_entry_after"])
new_qty = float(out["qty_after"])
if direction == "long":
out["loss_at_sl"] = round((new_avg - sl) * new_qty * mult, 2)
out["reward_at_tp"] = round((tp - new_avg) * new_qty * mult, 2)
else:
out["loss_at_sl"] = round((sl - new_avg) * new_qty * mult, 2)
out["reward_at_tp"] = round((new_avg - tp) * new_qty * mult, 2)
out["margin_capped"] = True
out["margin_cap_note"] = (
f"按滚仓保证金上限 {roll_pct:g}% 收紧:"
f"风险算 {add_lots} 手 → 实际 {capped}"
)
out["margin_usage_pct"] = round(usage, 2)
out["roll_max_margin_pct"] = roll_pct
return out, None
@app.route("/api/strategy/roll/preview", methods=["POST"])
@login_required
def api_roll_preview():
d = request.get_json(silent=True) or {}
conn = get_db()
mon_id = int(d.get("monitor_id") or 0)
mon = conn.execute("SELECT * FROM trade_order_monitors WHERE id=? AND status='active'", (mon_id,)).fetchone()
conn.close()
if not mon:
return jsonify({"ok": False, "error": "无有效持仓监控"}), 400
sym = mon["symbol"]
spec = get_contract_spec(sym)
capital = _capital(get_db())
preview, err = preview_roll(
direction=mon["direction"],
symbol=sym,
qty_existing=float(mon["lots"]),
entry_existing=float(mon["entry_price"]),
initial_take_profit=float(mon["take_profit"] or 0),
add_mode=d.get("add_mode") or "market",
new_stop_loss=float(d.get("new_stop_loss") or 0),
risk_percent=float(d.get("risk_percent") or 2),
capital_base=capital,
mult=spec["mult"],
add_price=float(d.get("add_price") or mon["entry_price"]),
fib_upper=d.get("fib_upper"),
fib_lower=d.get("fib_lower"),
legs_done=int(d.get("legs_done") or 0),
)
if err:
return jsonify({"ok": False, "error": err}), 400
preview, merr = _apply_roll_margin_cap(
preview, conn=conn, mode=get_trading_mode(get_setting), mon=dict(mon), capital=capital,
)
if merr:
return jsonify({"ok": False, "error": merr}), 400
return jsonify({"ok": True, "preview": preview})
@app.route("/api/strategy/roll/execute", methods=["POST"])
@login_required
def api_roll_execute():
d = request.get_json(silent=True) or {}
conn = get_db()
init_strategy_tables(conn)
mon_id = int(d.get("monitor_id") or 0)
mon = conn.execute("SELECT * FROM trade_order_monitors WHERE id=? AND status='active'", (mon_id,)).fetchone()
if not mon:
conn.close()
return jsonify({"ok": False, "error": "无有效持仓监控"}), 400
if conn.execute("SELECT id FROM trend_pullback_plans WHERE status='active'").fetchone():
conn.close()
return jsonify({"ok": False, "error": "趋势回调运行中,不可滚仓"}), 400
sym = mon["symbol"]
spec = get_contract_spec(sym)
capital = _capital(conn)
mode = get_trading_mode(get_setting)
prev, err = preview_roll(
direction=mon["direction"],
symbol=sym,
qty_existing=float(mon["lots"]),
entry_existing=float(mon["entry_price"]),
initial_take_profit=float(mon["take_profit"] or 0),
add_mode=d.get("add_mode") or "market",
new_stop_loss=float(d.get("new_stop_loss") or 0),
risk_percent=float(d.get("risk_percent") or 2),
capital_base=capital,
mult=spec["mult"],
add_price=float(d.get("add_price") or mon["entry_price"]),
)
if err:
conn.close()
return jsonify({"ok": False, "error": err}), 400
prev, merr = _apply_roll_margin_cap(
prev, conn=conn, mode=mode, mon=dict(mon), capital=capital,
)
if merr:
conn.close()
return jsonify({"ok": False, "error": merr}), 400
price = float(prev["add_price"])
try:
execute_order(
conn, mode=mode, offset="open", symbol=sym,
direction=mon["direction"], lots=int(prev["add_lots"]), price=price, settings=_settings_dict(),
)
except ValueError as exc:
conn.close()
return jsonify({"ok": False, "error": str(exc)}), 400
new_lots = int(mon["lots"]) + int(prev["add_lots"])
new_avg = prev["avg_entry_after"]
new_sl = prev["new_stop_loss"]
conn.execute(
"UPDATE trade_order_monitors SET lots=?, entry_price=?, stop_loss=? WHERE id=?",
(new_lots, new_avg, new_sl, mon_id),
)
grp = conn.execute(
"SELECT * FROM roll_groups WHERE order_monitor_id=? AND status='active'",
(mon_id,),
).fetchone()
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if grp:
gid = grp["id"]
leg_n = int(grp["leg_count"] or 0) + 1
conn.execute(
"UPDATE roll_groups SET leg_count=?, current_stop_loss=?, updated_at=? WHERE id=?",
(leg_n, new_sl, now, gid),
)
else:
cur = conn.execute(
"""INSERT INTO roll_groups (
order_monitor_id, symbol, direction, initial_take_profit, initial_stop_loss,
current_stop_loss, risk_percent, leg_count, status, created_at, updated_at
) VALUES (?,?,?,?,?,?,?,1,'active',?,?)""",
(mon_id, sym, mon["direction"], mon["take_profit"], mon["stop_loss"], new_sl,
float(d.get("risk_percent") or 2), now, now),
)
gid = cur.lastrowid
leg_n = 1
conn.execute(
"""INSERT INTO roll_legs (roll_group_id, leg_index, add_mode, fill_price, lots, new_stop_loss, status, created_at)
VALUES (?,?,?,?,?,?, 'filled', ?)""",
(gid, leg_n, d.get("add_mode") or "market", price, int(prev["add_lots"]), new_sl, now),
)
conn.commit()
conn.close()
return jsonify({"ok": True, "preview": prev})
@app.route("/api/strategy/trend/stop", methods=["POST"])
@login_required
def api_trend_stop():
d = request.get_json(silent=True) or {}
plan_id = int(d.get("plan_id") or 0)
conn = get_db()
plan = conn.execute("SELECT * FROM trend_pullback_plans WHERE id=? AND status='active'", (plan_id,)).fetchone()
if not plan:
conn.close()
return jsonify({"ok": False, "error": "计划不存在"}), 404
mode = get_trading_mode(get_setting)
price = fetch_price(plan["symbol"]) or float(plan["avg_entry_price"] or 0)
try:
if int(plan["lots_open"] or 0) > 0:
execute_order(
conn, mode=mode, offset="close", symbol=plan["symbol"],
direction=plan["direction"], lots=int(plan["lots_open"]), price=price, settings=_settings_dict(),
)
except ValueError:
pass
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
conn.execute(
"UPDATE trend_pullback_plans SET status='stopped_manual', message=?, opened_at=opened_at WHERE id=?",
("手动结束", plan_id),
)
save_snapshot(
conn, strategy_type=STRATEGY_TREND, source_id=plan_id,
symbol=plan["symbol"], direction=plan["direction"], result_label="手动结束",
payload=dict(plan), opened_at=plan["opened_at"] or "",
)
on_user_initiated_close(conn, trading_day=trading_day_label())
conn.commit()
conn.close()
return jsonify({"ok": True})
def check_trend_plans(app_ref):
"""后台:趋势补仓与止盈。"""
conn = get_db()
init_strategy_tables(conn)
rows = conn.execute("SELECT * FROM trend_pullback_plans WHERE status='active'").fetchall()
mode = get_trading_mode(get_setting)
for plan in rows:
sym = plan["symbol"]
price = fetch_price(sym)
if not price:
continue
direction = plan["direction"]
tp = float(plan["take_profit"] or 0)
if tp > 0:
hit_tp = (direction == "long" and price >= tp) or (direction == "short" and price <= tp)
if hit_tp:
try:
execute_order(
conn, mode=mode, offset="close", symbol=sym, direction=direction,
lots=int(plan["lots_open"] or 0), price=price, settings=_settings_dict(),
)
except ValueError:
pass
conn.execute(
"UPDATE trend_pullback_plans SET status='stopped_tp', message=? WHERE id=?",
("程序止盈", plan["id"]),
)
save_snapshot(
conn, strategy_type=STRATEGY_TREND, source_id=plan["id"],
symbol=sym, direction=direction, result_label="止盈",
payload=dict(plan), opened_at=plan["opened_at"] or "",
)
send_wechat_msg(f"趋势回调止盈 {sym}")
continue
try:
grid = json.loads(plan["grid_prices_json"] or "[]")
legs = json.loads(plan["leg_amounts_json"] or "[]")
except Exception:
grid, legs = [], []
done = int(plan["legs_done"] or 0)
if done < len(grid) and done < len(legs):
level = float(grid[done])
if trend_dca_level_reached(direction, price, level):
add_lots = int(legs[done])
try:
execute_order(
conn, mode=mode, offset="open", symbol=sym, direction=direction,
lots=add_lots, price=price, settings=_settings_dict(),
)
new_open = int(plan["lots_open"] or 0) + add_lots
old_avg = float(plan["avg_entry_price"] or price)
new_avg = (old_avg * int(plan["lots_open"] or 0) + price * add_lots) / new_open if new_open else price
conn.execute(
"""UPDATE trend_pullback_plans SET legs_done=?, lots_open=?, avg_entry_price=? WHERE id=?""",
(done + 1, new_open, new_avg, plan["id"]),
)
send_wechat_msg(f"趋势回调补仓 {sym} +{add_lots}手 @档位{done+1}")
except ValueError:
pass
conn.commit()
conn.close()
app._check_trend_plans = check_trend_plans
def _execute_key_breakout(conn, row, bar, break_side):
"""关键位箱体/收敛:5m 收盘突破后自动市价开仓。"""
from key_monitor_lib import (
TYPE_BOX,
calc_breakout_sl_tp,
format_auto_breakout_msg,
normalize_monitor_type,
resolve_order_direction,
)
sym = (row.get("symbol") or "").strip()
bar_time = str(bar.get("time") or "")[:19]
monitor_type = normalize_monitor_type(row.get("monitor_type") or "")
trade_mode = row.get("trade_mode") or "顺势"
direction = resolve_order_direction(break_side, trade_mode)
trailing_be = int(row.get("trailing_be") or 0)
try:
rr = float(row.get("risk_reward") or (3 if trailing_be else 2))
except (TypeError, ValueError):
rr = 3.0 if trailing_be else 2.0
if trailing_be and rr < 3:
rr = 3.0
def _notify(ok: bool, detail: str, **kw):
send_wechat_msg(format_auto_breakout_msg(
row,
break_side=break_side,
direction=direction,
entry=kw.get("entry", 0),
sl=kw.get("sl", 0),
tp=kw.get("tp", 0),
lots=kw.get("lots", 0),
bar_time=bar_time,
ok=ok,
detail=detail,
))
if monitor_type == TYPE_BOX:
cfg_dir = (row.get("direction") or "").strip().lower()
if cfg_dir in ("long", "short") and direction != cfg_dir:
dir_cn = "做多" if cfg_dir == "long" else "做空"
_notify(False, f"突破方向与上方向({dir_cn})不一致", entry=0, sl=0, tp=0, lots=0)
return False, "突破方向与上方向不一致"
try:
init_strategy_tables(conn)
mode = get_trading_mode(get_setting)
if not ctp_status(mode).get("connected"):
_notify(False, "CTP 未连接")
return False, "CTP 未连接"
if not is_trading_session():
_notify(False, "非交易时段")
return False, "非交易时段"
try:
entry = float(bar.get("close") or 0)
except (TypeError, ValueError):
_notify(False, "K 线收盘价无效")
return False, "K 线收盘价无效"
if entry <= 0:
_notify(False, "K 线收盘价无效")
return False, "K 线收盘价无效"
sl, tp = calc_breakout_sl_tp(
sym=sym, direction=direction, entry=entry, bar=bar, risk_reward=rr,
)
err = assert_can_open(conn, active_count=_effective_active_position_count(conn, mode))
if err:
_notify(False, err, entry=entry, sl=sl, tp=tp, lots=0)
return False, err
capital = _capital(conn)
lots, lot_err = calc_lots_by_risk(
entry, sl, direction, capital, get_risk_percent(get_setting), sym,
max_margin_pct=get_max_margin_pct(get_setting), trading_mode=mode,
)
if lot_err or not lots:
msg = lot_err or "手数计算失败"
_notify(False, msg, entry=entry, sl=sl, tp=tp, lots=0)
return False, msg
result = execute_order(
conn,
mode=mode,
offset="open",
symbol=sym,
direction=direction,
lots=lots,
price=entry,
settings=_settings_dict(),
order_type="market",
)
open_ts = bar_time.replace("T", " ") if bar_time else datetime.now().strftime("%Y-%m-%d %H:%M:%S")
vt_order_id = str(result.get("order_id") or "")
mid = _upsert_open_monitor(
conn,
sym=sym,
direction=direction,
lots=lots,
price=entry,
sl=sl,
tp=tp,
trailing_be=trailing_be,
open_time=open_ts,
monitor_type=monitor_type,
status="pending",
vt_order_id=vt_order_id or None,
order_price=entry,
)
_reconcile_pending(conn, mode, capital=capital)
st_row = conn.execute(
"SELECT status FROM trade_order_monitors WHERE id=?", (mid,),
).fetchone()
filled = st_row and (st_row["status"] or "").strip().lower() == "active"
rejected = st_row and (st_row["status"] or "").strip().lower() == "closed"
if rejected:
conn.commit()
_notify(False, "委托被柜台拒绝或撤销", entry=entry, sl=sl, tp=tp, lots=lots)
return False, "委托被拒绝"
if filled:
_sync_monitor_from_ctp(
conn, mid, sym, direction, mode, capital=capital,
)
conn.commit()
if filled:
from db_conn import DB_PATH
from ai_worker import schedule_ai_event_analysis
from trade_notify import notify_key_breakout_open
notify_key_breakout_open(
send_wechat=send_wechat_msg,
get_setting=get_setting,
mode_label=trading_mode_label(get_setting),
row=row,
break_side=break_side,
bar_time=bar_time,
direction=direction,
entry=entry,
sl=sl,
tp=tp,
lots=lots,
capital=capital,
order_id=vt_order_id,
schedule_ai_fn=schedule_ai_event_analysis,
db_path=DB_PATH,
)
else:
_notify(True, "委托已提交,待成交", entry=entry, sl=sl, tp=tp, lots=lots)
_push_position_snapshot_async(fast=False)
return True, "已下单" if filled else "委托已提交"
except Exception as exc:
logger.warning("key breakout auto order: %s", exc)
_notify(False, str(exc))
return False, str(exc)
app._execute_key_breakout = _execute_key_breakout
@app.route("/settings/trading", methods=["POST"])
@login_required
def settings_trading_post():
return redirect(url_for("settings"))
def hook_review_mood(conn, behavior_tags: str, exit_trigger: str, exit_supplement: str):
if parse_mood_issues(behavior_tags):
on_mood_journal_freeze(conn, trading_day=trading_day_label())
if (exit_trigger or "").strip() == "手动平仓" and (exit_supplement or "").strip():
reduce_cooloff_after_journal(conn, trading_day=trading_day_label())
app._risk_review_hook = hook_review_mood
from db_conn import DB_PATH
def _init_tables(conn):
init_strategy_tables(conn)
start_recommend_worker(
db_path=DB_PATH,
get_capital_fn=_recommend_capital,
quote_fn=_main_quote,
init_tables_fn=_init_tables,
get_mode_fn=lambda: get_trading_mode(get_setting),
get_max_margin_pct_fn=lambda: get_max_margin_pct(get_setting),
get_sizing_mode_fn=lambda: get_sizing_mode(get_setting),
get_fixed_lots_fn=lambda: get_fixed_lots(get_setting),
)
start_ctp_reconnect_worker(
get_mode_fn=lambda: get_trading_mode(get_setting),
get_setting_fn=get_setting,
)
start_ctp_premarket_connect_worker(
get_mode_fn=lambda: get_trading_mode(get_setting),
get_setting_fn=get_setting,
)
start_sl_tp_guard_worker(
db_path=DB_PATH,
get_mode_fn=lambda: get_trading_mode(get_setting),
init_tables_fn=_init_tables,
get_capital_fn=_capital,
get_be_tick_buffer_fn=lambda: get_trailing_be_tick_buffer(get_setting),
notify_fn=send_wechat_msg,
interval=1,
)
_pos_refresh_tick = {"n": 0}
_last_full_calibrate = {"ts": 0.0}
def _position_worker_refresh() -> dict:
import time as _time
from ctp_trading_state import CALIBRATE_INTERVAL_SEC
_pos_refresh_tick["n"] += 1
mode = get_trading_mode(get_setting)
connected = bool(ctp_status(mode).get("connected"))
now = _time.time()
need_full = (
connected
and (
trading_state.needs_calibrate()
or (now - _last_full_calibrate["ts"]) >= CALIBRATE_INTERVAL_SEC
)
)
if need_full:
_last_full_calibrate["ts"] = now
payload = _refresh_trading_live_snapshot(fast=False)
else:
payload = _refresh_trading_live_snapshot(fast=True)
return payload
start_position_worker(
refresh_fn=_position_worker_refresh,
interval=1,
idle_interval=3,
)
_bootstrap_trading_runtime()
start_ctp_fee_worker(
get_mode_fn=lambda: get_trading_mode(get_setting),
get_setting_fn=get_setting,
set_setting_fn=set_setting,
)
from ai_worker import start_ai_worker
start_ai_worker(
db_path=DB_PATH,
get_setting_fn=get_setting,
set_setting_fn=set_setting,
send_wechat_fn=send_wechat_msg,
)
start_pending_order_worker(
db_path=DB_PATH,
get_mode_fn=lambda: get_trading_mode(get_setting),
init_tables_fn=_init_tables,
get_capital_fn=_capital,
reconcile_fn=_reconcile_pending,
on_changed_fn=lambda: _push_position_snapshot_async(fast=False),
)