Files
qihuo/app.py
T
dekun 9f48f22d16 Gate order cancel to trading hours and sync trade logs from CTP.
Disable cancel UI outside sessions, query exchange fills for records, and label local vs counterparty rows.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-26 00:35:51 +08:00

1859 lines
66 KiB
Python

import os
from locale_fix import ensure_process_locale
ensure_process_locale()
import sqlite3
import time
import threading
import requests
from datetime import datetime, timedelta
from typing import Optional
from functools import wraps
from zoneinfo import ZoneInfo
from werkzeug.utils import secure_filename
from dotenv import load_dotenv
from flask import (
Flask, render_template, request, redirect, url_for,
flash, session, jsonify, Response, stream_with_context,
)
from werkzeug.security import check_password_hash, generate_password_hash
from functools import wraps
from symbols import (
search_symbols,
ths_to_codes,
list_main_contracts_grouped,
list_recommended_symbols_grouped,
refresh_main_index,
)
from contract_specs import calc_position_metrics
from fee_specs import (
calc_fee_breakdown,
calc_round_trip_fee,
list_fee_rates_for_ui,
count_fee_rates_by_source,
purge_non_ctp_fee_rates,
)
from nav_settings import NAV_TOGGLES, get_nav_items, nav_enabled, save_nav_items
from contract_profile import get_contract_profile
from stats_engine import STATS_VIEWS, load_stats_cache, refresh_stats_cache
from kline_store import ensure_kline_tables
from kline_stream import kline_hub, sse_format
from kline_chart import generate_review_kline_chart, fetch_market_klines, MARKET_PERIODS
from market import get_price as market_get_price, set_ths_refresh_token, get_quote_source_label
from db_conn import connect_db
from strategy.strategy_db import init_strategy_tables
from install_trading import install_trading
from vnpy_bridge import try_init_vnpy
load_dotenv(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env"))
app = Flask(__name__)
app.secret_key = os.getenv("SECRET_KEY", "futures_monitor_default_secret")
HOST = os.getenv("HOST", "0.0.0.0")
PORT = int(os.getenv("PORT", "6600"))
DEBUG = os.getenv("DEBUG", "false").lower() in ("1", "true", "yes")
DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "futures.db")
UPLOAD_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "uploads")
TZ = ZoneInfo("Asia/Shanghai")
OPEN_TYPES = ["突破开仓", "回调开仓", "追涨杀跌", "计划内开仓", "震荡摸顶底", "其他"]
EXIT_TRIGGERS = ["止盈", "止损", "手工平仓", "移动止损", "时间离场", "其他"]
BEHAVIOR_TAGS = ["怕踏空", "报复开仓", "盈利飘了", "拿不住单", "扛单", "重仓违规"]
KLINE_PERIODS = ["1m", "3m", "5m", "15m", "30m", "1h", "4h", "1d"]
KLINE_CUTOFFS = ["平仓时间", "开仓时间", "当前时间"]
def today_str() -> str:
return datetime.now(TZ).date().isoformat()
def calc_holding_duration(open_time: str, close_time: str) -> str:
try:
o = datetime.fromisoformat(open_time.strip().replace(" ", "T")[:19])
c = datetime.fromisoformat(close_time.strip().replace(" ", "T")[:19])
delta = c - o
if delta.total_seconds() < 0:
return ""
secs = int(delta.total_seconds())
h, rem = divmod(secs, 3600)
m, _ = divmod(rem, 60)
if h:
return f"{h}小时{m}分钟"
return f"{m}分钟"
except Exception:
return ""
def holding_to_minutes(open_time: str, close_time: str) -> int:
try:
o = datetime.fromisoformat(open_time.strip().replace(" ", "T"))
c = datetime.fromisoformat(close_time.strip().replace(" ", "T"))
secs = int((c - o).total_seconds())
return max(0, secs // 60)
except Exception:
return 0
def classify_close_result(direction: str, close: float, sl: float, tp: float) -> str:
"""根据平仓价与止损/止盈距离判断结果。"""
if close is None:
return "手动平仓"
tol = max(abs(close) * 0.002, 1.0)
if abs(close - tp) <= tol:
return "止盈"
if abs(close - sl) <= tol:
return "止损"
return "手动平仓"
def calc_rr_ratio(direction: str, entry: float, stop: float, target: float) -> Optional[float]:
"""盈亏比 = 盈利空间 / 风险空间。"""
if entry is None or stop is None or target is None:
return None
if direction == "long":
risk = entry - stop
if risk <= 0:
return None
return round((target - entry) / risk, 2)
if direction == "short":
risk = stop - entry
if risk <= 0:
return None
return round((entry - target) / risk, 2)
return None
def calc_theoretical_pnl(direction: str, entry: float, target: float, lots: float) -> Optional[float]:
if entry is None or target is None or lots is None:
return None
if direction == "long":
return round((target - entry) * lots, 2)
if direction == "short":
return round((entry - target) * lots, 2)
return None
def parse_review_date_filter(preset: str, start: str, end: str) -> tuple[str, str]:
today = datetime.now(TZ).date()
if preset == "today":
s = today.isoformat()
return s, s
if preset == "week":
monday = today - timedelta(days=today.weekday())
return monday.isoformat(), today.isoformat()
if preset == "month":
return today.replace(day=1).isoformat(), today.isoformat()
return start.strip(), end.strip()
def expire_old_plans():
"""当日结束后计划自动失效,保留历史。"""
today = today_str()
conn = get_db()
conn.execute(
"UPDATE order_plans SET status='expired' WHERE plan_date < ? AND status IN ('planned', 'active')",
(today,),
)
conn.execute(
"UPDATE order_plans SET plan_date=date(created_at) WHERE plan_date IS NULL OR plan_date=''"
)
conn.commit()
conn.close()
def get_db():
return connect_db()
def get_setting(key: str, default: str = "") -> str:
conn = get_db()
row = conn.execute("SELECT value FROM settings WHERE key=?", (key,)).fetchone()
conn.close()
return row["value"] if row else default
def set_setting(key: str, value: str):
conn = get_db()
conn.execute(
"INSERT INTO settings (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=?",
(key, value, value),
)
conn.commit()
conn.close()
def require_nav(key: str):
"""导航项关闭时拒绝访问对应页面。"""
def decorator(f):
@wraps(f)
def wrapped(*args, **kwargs):
if not nav_enabled(get_setting, key):
flash("该页面已在系统设置中关闭")
return redirect(url_for("positions"))
return f(*args, **kwargs)
return wrapped
return decorator
@app.context_processor
def inject_globals():
trade_js = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static", "js", "trade.js")
asset_v = str(int(os.path.getmtime(trade_js))) if os.path.isfile(trade_js) else "0"
return {"nav_items": get_nav_items(get_setting), "asset_v": asset_v}
def _trading_mode() -> str:
return (get_setting("trading_mode", "simulation") or "simulation").strip()
def touch_stats_cache():
try:
conn = get_db()
capital = float(get_setting("live_capital", "0") or 0)
refresh_stats_cache(conn, capital)
conn.close()
except Exception as exc:
app.logger.warning("stats cache refresh failed: %s", exc)
def get_stats_data() -> dict:
conn = get_db()
capital = float(get_setting("live_capital", "0") or 0)
data = load_stats_cache(conn)
if not data:
data = refresh_stats_cache(conn, capital)
conn.close()
return data
def init_db():
conn = get_db()
c = conn.cursor()
c.execute("CREATE TABLE IF NOT EXISTS settings (key TEXT PRIMARY KEY, value TEXT)")
c.execute('''CREATE TABLE IF NOT EXISTS order_plans
(id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT, symbol_name TEXT, direction TEXT,
zone_upper REAL, zone_lower REAL,
stop_loss REAL, take_profit REAL,
status TEXT DEFAULT "planned",
triggered_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
c.execute('''CREATE TABLE IF NOT EXISTS key_monitors
(id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT, symbol_name TEXT, monitor_type TEXT, direction TEXT,
upper REAL, lower REAL,
upper_triggered INTEGER DEFAULT 0,
lower_triggered INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
c.execute('''CREATE TABLE IF NOT EXISTS trade_records
(id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT, symbol_name TEXT, monitor_type TEXT, direction TEXT,
trigger_price REAL, stop_loss REAL, take_profit REAL,
result TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
migrations = [
"ALTER TABLE key_monitors ADD COLUMN symbol_name TEXT",
"ALTER TABLE key_monitors ADD COLUMN upper_triggered INTEGER DEFAULT 0",
"ALTER TABLE key_monitors ADD COLUMN lower_triggered INTEGER DEFAULT 0",
"ALTER TABLE trade_records ADD COLUMN symbol_name TEXT",
"ALTER TABLE order_plans ADD COLUMN sina_code TEXT",
"ALTER TABLE order_plans ADD COLUMN market_code TEXT",
"ALTER TABLE key_monitors ADD COLUMN market_code TEXT",
"ALTER TABLE key_monitors ADD COLUMN sina_code TEXT",
"ALTER TABLE trade_records ADD COLUMN market_code TEXT",
"ALTER TABLE order_plans ADD COLUMN plan_date TEXT",
"ALTER TABLE order_plans ADD COLUMN decision_reason TEXT",
"ALTER TABLE key_monitors ADD COLUMN status TEXT DEFAULT 'active'",
"ALTER TABLE key_monitors ADD COLUMN archived_at TEXT",
"ALTER TABLE review_records ADD COLUMN direction TEXT",
"ALTER TABLE review_records ADD COLUMN entry_price REAL",
"ALTER TABLE review_records ADD COLUMN stop_loss REAL",
"ALTER TABLE review_records ADD COLUMN take_profit REAL",
"ALTER TABLE review_records ADD COLUMN close_price REAL",
"ALTER TABLE review_records ADD COLUMN lots REAL",
"ALTER TABLE review_records ADD COLUMN holding_duration TEXT",
"ALTER TABLE review_records ADD COLUMN initial_pnl REAL",
"ALTER TABLE review_records ADD COLUMN actual_pnl REAL",
"ALTER TABLE review_records ADD COLUMN is_emotion INTEGER DEFAULT 0",
"ALTER TABLE review_records ADD COLUMN symbol_name TEXT",
"ALTER TABLE review_records ADD COLUMN market_code TEXT",
"ALTER TABLE review_records ADD COLUMN sina_code TEXT",
"ALTER TABLE trade_logs ADD COLUMN fee REAL",
"ALTER TABLE trade_logs ADD COLUMN pnl_net REAL",
"ALTER TABLE trade_logs ADD COLUMN margin_pct REAL",
"ALTER TABLE trade_logs ADD COLUMN equity_after REAL",
"ALTER TABLE review_records ADD COLUMN fee REAL",
"ALTER TABLE review_records ADD COLUMN pnl_net REAL",
]
for sql in migrations:
try:
c.execute(sql)
except sqlite3.OperationalError:
pass
c.execute('''CREATE TABLE IF NOT EXISTS review_records
(id INTEGER PRIMARY KEY AUTOINCREMENT,
open_time TEXT, close_time TEXT,
symbol TEXT, timeframe TEXT,
pnl REAL,
open_type TEXT, expected_rr REAL, actual_rr REAL,
exit_trigger TEXT, exit_supplement TEXT,
watch_after_breakeven TEXT, new_position_while_occupied TEXT,
screenshot TEXT,
auto_kline INTEGER DEFAULT 0,
kline_period1 TEXT, kline_period2 TEXT,
kline_count INTEGER, kline_cutoff TEXT,
behavior_tags TEXT, notes TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
c.execute('''CREATE TABLE IF NOT EXISTS position_monitors
(id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT, symbol_name TEXT, market_code TEXT, sina_code TEXT,
direction TEXT, lots REAL, entry_price REAL,
stop_loss REAL, take_profit REAL, open_time TEXT,
status TEXT DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
c.execute('''CREATE TABLE IF NOT EXISTS trade_logs
(id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT, symbol_name TEXT, market_code TEXT, sina_code TEXT,
monitor_type TEXT, direction TEXT,
entry_price REAL, stop_loss REAL, take_profit REAL, close_price REAL,
lots REAL, margin REAL, holding_minutes INTEGER,
open_time TEXT, close_time TEXT,
pnl REAL, result TEXT,
verified INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
c.execute('''CREATE TABLE IF NOT EXISTS fee_rates
(product TEXT PRIMARY KEY,
exchange TEXT,
mult INTEGER,
open_fixed REAL DEFAULT 0,
open_ratio REAL DEFAULT 0,
close_yesterday_fixed REAL DEFAULT 0,
close_yesterday_ratio REAL DEFAULT 0,
close_today_fixed REAL DEFAULT 0,
close_today_ratio REAL DEFAULT 0,
updated_at TEXT)''')
c.execute('''CREATE TABLE IF NOT EXISTS stats_cache
(key TEXT PRIMARY KEY,
data_json TEXT NOT NULL,
updated_at TEXT NOT NULL)''')
for sql in (
"ALTER TABLE fee_rates ADD COLUMN source TEXT DEFAULT 'local'",
):
try:
c.execute(sql)
except sqlite3.OperationalError:
pass
ensure_kline_tables(conn)
init_strategy_tables(conn)
from risk.account_risk_lib import ensure_account_risk_schema
from recommend_store import ensure_recommend_tables
ensure_account_risk_schema(conn)
ensure_recommend_tables(conn)
conn.commit()
conn.close()
sync_admin_from_env()
if not get_setting("wechat_webhook") and os.getenv("WECHAT_WEBHOOK"):
set_setting("wechat_webhook", os.getenv("WECHAT_WEBHOOK"))
if not get_setting("ths_refresh_token") and os.getenv("THS_REFRESH_TOKEN"):
set_setting("ths_refresh_token", os.getenv("THS_REFRESH_TOKEN"))
from ctp_settings import seed_ctp_settings_from_env
seed_ctp_settings_from_env(set_setting)
os.makedirs(UPLOAD_DIR, exist_ok=True)
expire_old_plans()
if not get_setting("fee_multiplier"):
set_setting("fee_multiplier", "2")
if not get_setting("trading_mode"):
set_setting("trading_mode", "simulation")
if not get_setting("position_sizing_mode"):
set_setting("position_sizing_mode", "fixed")
if not get_setting("fixed_lots"):
set_setting("fixed_lots", "1")
if not get_setting("fixed_amount"):
set_setting("fixed_amount", "5000")
if not get_setting("risk_percent"):
set_setting("risk_percent", "1")
if not get_setting("max_margin_pct"):
set_setting("max_margin_pct", "30")
if not get_setting("trailing_be_tick_buffer"):
set_setting("trailing_be_tick_buffer", "2")
if not get_setting("pending_order_timeout_min"):
set_setting("pending_order_timeout_min", "5")
if not get_setting("fee_source_mode"):
set_setting("fee_source_mode", "ctp")
set_setting("fee_source_mode", "ctp")
try:
purge_non_ctp_fee_rates()
except Exception:
pass
def sync_admin_from_env():
"""
从 .env 同步管理员账号。
- 首次建库:自动写入 ADMIN_USERNAME / ADMIN_PASSWORD
- 已建库后改 .env:需设 ADMIN_SYNC_FROM_ENV=true 并重启服务
"""
sync = os.getenv("ADMIN_SYNC_FROM_ENV", "false").lower() in ("1", "true", "yes")
env_username = os.getenv("ADMIN_USERNAME", "").strip()
env_password = os.getenv("ADMIN_PASSWORD", "").strip()
placeholder_passwords = {"", "change-me-on-first-login", "admin123"}
if not get_setting("admin_username"):
username = env_username or "admin"
password = env_password if env_password not in placeholder_passwords else "admin123"
set_setting("admin_username", username)
set_setting("admin_password_hash", generate_password_hash(password))
return
if not sync:
return
if env_username:
set_setting("admin_username", env_username)
if env_password and env_password not in placeholder_passwords:
set_setting("admin_password_hash", generate_password_hash(env_password))
init_db()
def sync_ths_token():
set_ths_refresh_token(get_setting("ths_refresh_token"))
sync_ths_token()
def build_market_quote_payload(
symbol: str,
market_code: str = "",
sina_code: str = "",
) -> dict:
if not market_code or not sina_code:
codes = ths_to_codes(symbol)
if codes:
market_code = codes.get("market_code", "") or market_code
sina_code = codes.get("sina_code", "") or sina_code
quote_source = "sina"
price = None
prev_close = None
try:
from vnpy_bridge import ctp_status, ctp_get_tick_detail
from trading_context import get_trading_mode
mode = get_trading_mode(get_setting)
if ctp_status(mode).get("connected"):
detail = ctp_get_tick_detail(mode, symbol)
if detail.get("price"):
price = detail["price"]
quote_source = "ctp"
if detail.get("pre_close") is not None:
prev_close = detail["pre_close"]
except Exception:
pass
if price is None:
price = fetch_price(symbol, market_code, sina_code)
name = symbol
codes = ths_to_codes(symbol)
if codes:
name = codes.get("name", symbol)
if prev_close is None and sina_code:
from market import fetch_raw_for_volume
raw = fetch_raw_for_volume(sina_code)
if raw and raw.get("prev_close") is not None:
prev_close = raw["prev_close"]
return {
"symbol": symbol,
"name": name,
"price": price,
"prev_close": prev_close,
"quote_source": quote_source,
}
# —————————————— 推送 ——————————————
def send_wechat_msg(content: str):
webhook = get_setting("wechat_webhook")
if not webhook:
return
full = f"【国内期货】\n{content}"
data = {"msgtype": "text", "text": {"content": full}}
try:
requests.post(webhook, json=data, timeout=10)
except Exception:
pass
# —————————————— 行情 ——————————————
def resolve_market_codes(ths_code: str, market_code: str = "", sina_code: str = "") -> tuple[str, str]:
"""返回 (market_code, sina_code) 用于行情拉取。"""
if market_code:
return market_code, sina_code
if sina_code and "." in sina_code:
return sina_code, ""
codes = ths_to_codes(ths_code)
if codes:
return codes["market_code"], codes["sina_code"]
if ths_code.startswith("nf_") or ths_code.startswith("CFF_RE_"):
return ths_code, ths_code
return "", sina_code or ""
def fetch_price(ths_code: str, market_code: str = "", sina_code: str = "") -> Optional[float]:
sym = (ths_code or "").strip()
if sym:
try:
from vnpy_bridge import ctp_status, ctp_get_tick_price
from trading_context import get_trading_mode
mode = get_trading_mode(get_setting)
if ctp_status(mode).get("connected"):
p = ctp_get_tick_price(mode, sym)
if p and p > 0:
return p
except Exception:
pass
mc, sc = resolve_market_codes(sym, market_code, sina_code)
if not mc and not sc:
return None
return market_get_price(mc, sc)
# —————————————— 监控逻辑 ——————————————
def check_order_plans():
expire_old_plans()
today = today_str()
conn = get_db()
rows = conn.execute(
"SELECT * FROM order_plans WHERE plan_date=? AND status IN ('planned', 'active')",
(today,),
).fetchall()
for r in rows:
sym = r["symbol"]
sina = r["sina_code"] if "sina_code" in r.keys() else ""
market = r["market_code"] if "market_code" in r.keys() else ""
p = fetch_price(sym, market, sina)
if not p:
continue
direction = r["direction"]
zone_upper = r["zone_upper"]
zone_lower = r["zone_lower"]
stop_loss = r["stop_loss"]
take_profit = r["take_profit"]
status = r["status"]
pid = r["id"]
name = r["symbol_name"] or sym
reason = r["decision_reason"] if "decision_reason" in r.keys() and r["decision_reason"] else ""
# 计划状态:价格进入决策区间则激活并通知
if status == "planned":
in_zone = zone_lower <= p <= zone_upper
if in_zone:
msg = (
f"【开单计划触发】{name} ({sym})\n"
f"方向:{'做多' if direction == 'long' else '做空'}\n"
f"决策区间:{zone_lower} ~ {zone_upper}\n"
f"决策理由:{reason}\n"
f"当前价:{p}\n"
f"止损:{stop_loss} 止盈:{take_profit}"
)
send_wechat_msg(msg)
conn.execute(
"UPDATE order_plans SET status='active', triggered_at=? WHERE id=?",
(datetime.now().isoformat(), pid),
)
status = "active"
# 激活状态:监控止盈止损
if status == "active":
res = None
if direction == "long":
if p >= take_profit:
res = "止盈"
elif p <= stop_loss:
res = "止损"
elif direction == "short":
if p <= take_profit:
res = "止盈"
elif p >= stop_loss:
res = "止损"
if res:
msg = (
f"[{'做多' if direction == 'long' else '做空'}] {name}{res}\n"
f"决策区间:{zone_lower} ~ {zone_upper}\n"
f"止损:{stop_loss} 止盈:{take_profit}\n"
f"当前价:{p}"
)
send_wechat_msg(msg)
conn.execute(
"""INSERT INTO trade_records
(symbol, symbol_name, monitor_type, direction,
trigger_price, stop_loss, take_profit, result)
VALUES (?,?,?,?,?,?,?,?)""",
(sym, name, "开单计划", direction, p, stop_loss, take_profit, res),
)
conn.execute(
"UPDATE order_plans SET status='closed' WHERE id=?", (pid,)
)
conn.commit()
conn.close()
def check_key_monitors():
conn = get_db()
rows = conn.execute(
"SELECT * FROM key_monitors WHERE status='active' OR status IS NULL"
).fetchall()
for r in rows:
sym = r["symbol"]
typ = r["monitor_type"]
up = r["upper"]
low = r["lower"]
up_trig = r["upper_triggered"]
low_trig = r["lower_triggered"]
name = r["symbol_name"] or sym
pid = r["id"]
sina = r["sina_code"] if "sina_code" in r.keys() else ""
market = r["market_code"] if "market_code" in r.keys() else ""
p = fetch_price(sym, market, sina)
if not p:
continue
if typ in ("箱体突破", "收敛突破"):
if p > up and not up_trig:
send_wechat_msg(f"{name} 突破{typ}上沿 {up}\n当前价:{p}")
conn.execute(
"UPDATE key_monitors SET upper_triggered=1 WHERE id=?", (pid,)
)
if p < low and not low_trig:
send_wechat_msg(f"{name} 跌破{typ}下沿 {low}\n当前价:{p}")
conn.execute(
"UPDATE key_monitors SET lower_triggered=1 WHERE id=?", (pid,)
)
elif typ == "关键阻力位" and p > up and not up_trig:
send_wechat_msg(f"{name} 突破阻力位 {up}\n当前价:{p}")
conn.execute(
"UPDATE key_monitors SET upper_triggered=1 WHERE id=?", (pid,)
)
elif typ == "关键支撑位" and p < low and not low_trig:
send_wechat_msg(f"{name} 跌破支撑位 {low}\n当前价:{p}")
conn.execute(
"UPDATE key_monitors SET lower_triggered=1 WHERE id=?", (pid,)
)
conn.commit()
conn.close()
def background_task():
while True:
try:
expire_old_plans()
check_key_monitors()
check_order_plans()
fn = getattr(app, "_check_trend_plans", None)
if fn:
fn(app)
except Exception:
pass
time.sleep(3)
def start_background_threads():
from trading_context import get_trading_mode
threading.Thread(target=background_task, daemon=True).start()
threading.Thread(
target=lambda: kline_hub.worker_loop(
DB_PATH,
build_market_quote_payload,
get_mode_fn=lambda: get_trading_mode(get_setting),
),
daemon=True,
).start()
threading.Thread(target=refresh_main_index, daemon=True).start()
# —————————————— 登录 ——————————————
def login_required(f):
@wraps(f)
def wrap(*args, **kwargs):
if not session.get("logged_in"):
return redirect(url_for("login"))
return f(*args, **kwargs)
return wrap
@app.route("/")
def index():
if session.get("logged_in"):
return redirect(url_for("plans"))
return redirect(url_for("login"))
@app.route("/manifest.webmanifest")
def web_manifest():
response = app.send_static_file("manifest.json")
response.mimetype = "application/manifest+json"
return response
@app.route("/sw.js")
def service_worker():
response = app.send_static_file("sw.js")
response.headers["Cache-Control"] = "no-cache"
response.headers["Service-Worker-Allowed"] = "/"
return response
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
u = request.form.get("username", "").strip()
p = request.form.get("password", "")
admin_u = get_setting("admin_username")
admin_hash = get_setting("admin_password_hash")
if u == admin_u and check_password_hash(admin_hash, p):
session["logged_in"] = True
session["username"] = u
return redirect(url_for("plans"))
flash("账号或密码错误")
return render_template("login.html")
@app.route("/logout")
def logout():
session.clear()
return redirect(url_for("login"))
# —————————————— API ——————————————
@app.route("/api/symbols/search")
@login_required
def api_symbol_search():
q = request.args.get("q", "")
return jsonify(search_symbols(q))
@app.route("/api/symbols/mains")
@login_required
def api_symbols_mains():
return jsonify(list_main_contracts_grouped())
@app.route("/api/symbols/recommended")
@login_required
def api_symbols_recommended():
"""品种下拉:仅展示当前资金下推荐的品种(与下方品种推荐表一致)。"""
from recommend_store import recommend_payload
from trading_context import (
get_account_capital,
get_fixed_lots,
get_max_margin_pct,
get_sizing_mode,
get_trading_mode,
)
conn = get_db()
try:
capital = get_account_capital(conn, get_setting)
payload = recommend_payload(
conn,
live_capital=capital,
max_margin_pct=get_max_margin_pct(get_setting),
trading_mode=get_trading_mode(get_setting),
sizing_mode=get_sizing_mode(get_setting),
fixed_lots=get_fixed_lots(get_setting),
)
return jsonify(list_recommended_symbols_grouped(payload.get("rows") or []))
finally:
conn.close()
@app.route("/api/key_prices")
@login_required
def api_key_prices():
"""关键位监控列表:批量现价与距上/下沿距离。"""
conn = get_db()
rows = conn.execute(
"SELECT id, symbol, market_code, sina_code, upper, lower "
"FROM key_monitors WHERE status='active' OR status IS NULL"
).fetchall()
conn.close()
out = []
for r in rows:
sym = r["symbol"]
market = r["market_code"] or ""
sina = r["sina_code"] or ""
upper = float(r["upper"])
lower = float(r["lower"])
price = fetch_price(sym, market, sina)
dist_upper = None
dist_lower = None
if price is not None:
dist_upper = round(upper - price, 2)
dist_lower = round(price - lower, 2)
out.append({
"id": r["id"],
"price": price,
"dist_upper": dist_upper,
"dist_lower": dist_lower,
})
return jsonify(out)
@app.route("/api/plan_prices")
@login_required
def api_plan_prices():
"""今日计划:批量现价与距决策区间上/下沿距离。"""
today = today_str()
conn = get_db()
rows = conn.execute(
"SELECT id, symbol, market_code, sina_code, zone_upper, zone_lower "
"FROM order_plans WHERE plan_date=? AND status IN ('planned', 'active')",
(today,),
).fetchall()
conn.close()
out = []
for r in rows:
sym = r["symbol"]
market = r["market_code"] or ""
sina = r["sina_code"] or ""
upper = float(r["zone_upper"])
lower = float(r["zone_lower"])
price = fetch_price(sym, market, sina)
dist_upper = None
dist_lower = None
in_zone = False
if price is not None:
dist_upper = round(upper - price, 2)
dist_lower = round(price - lower, 2)
in_zone = lower <= price <= upper
out.append({
"id": r["id"],
"price": price,
"dist_upper": dist_upper,
"dist_lower": dist_lower,
"in_zone": in_zone,
})
return jsonify(out)
@app.route("/api/position_live")
@login_required
def api_position_live():
capital = float(get_setting("live_capital", "0") or 0)
now_iso = datetime.now(TZ).strftime("%Y-%m-%dT%H:%M")
conn = get_db()
rows = conn.execute(
"SELECT * FROM position_monitors WHERE status='active' ORDER BY id DESC"
).fetchall()
conn.close()
out = []
for r in rows:
sym = r["symbol"]
market = r["market_code"] or ""
sina = r["sina_code"] or ""
direction = r["direction"]
entry = float(r["entry_price"])
sl = float(r["stop_loss"])
tp = float(r["take_profit"])
lots = float(r["lots"] or 1)
mark = fetch_price(sym, market, sina)
metrics = calc_position_metrics(
direction, entry, sl, tp, lots, mark, capital, sym,
)
holding = calc_holding_duration(r["open_time"] or "", now_iso)
close_est = mark if mark is not None else entry
fee_info = calc_fee_breakdown(
sym, entry, close_est, lots, r["open_time"] or "", now_iso,
trading_mode=_trading_mode(),
)
est_net = None
if metrics.get("float_pnl") is not None:
est_net = round(metrics["float_pnl"] - fee_info["total_fee"], 2)
out.append({
"id": r["id"],
"symbol": r["symbol_name"] or sym,
"symbol_code": sym,
"direction": "做多" if direction == "long" else "做空",
"lots": lots,
"entry_price": entry,
"stop_loss": sl,
"take_profit": tp,
"open_time": r["open_time"],
"mark_price": mark,
"holding_duration": holding,
"est_fee": fee_info["total_fee"],
"est_fee_open": fee_info["open_fee"],
"est_fee_close": fee_info["close_fee"],
"est_fee_close_type": fee_info["close_type"],
"est_pnl_net": est_net,
**metrics,
})
return jsonify(out)
@login_required
def index():
if nav_enabled(get_setting, "plans"):
return redirect(url_for("plans"))
return redirect(url_for("positions"))
@app.route("/plans")
@login_required
@require_nav("plans")
def plans():
today = today_str()
start = request.args.get("start", "")
end = request.args.get("end", "")
conn = get_db()
plan_list = conn.execute(
"SELECT * FROM order_plans WHERE plan_date=? AND status IN ('planned', 'active') ORDER BY id DESC",
(today,),
).fetchall()
sql = "SELECT * FROM order_plans WHERE plan_date < ? OR status IN ('closed', 'expired')"
params: list = [today]
if start:
sql += " AND plan_date >= ?"
params.append(start)
if end:
sql += " AND plan_date <= ?"
params.append(end)
sql += " ORDER BY plan_date DESC, id DESC LIMIT 200"
history = conn.execute(sql, params).fetchall()
conn.close()
return render_template(
"plans.html",
plans=plan_list,
history=history,
today=today,
start=start,
end=end,
)
@app.route("/add_plan", methods=["POST"])
@login_required
def add_plan():
d = request.form
direction = d.get("direction")
symbol = d.get("symbol", "").strip()
symbol_name = d.get("symbol_name", "").strip()
market_code = d.get("market_code", "").strip()
sina_code = d.get("sina_code", "").strip()
if not direction:
flash("请选择多空方向")
return redirect(url_for("plans"))
if not symbol or not market_code:
flash("请从下拉列表选择品种(同花顺合约代码)")
return redirect(url_for("plans"))
conn = get_db()
conn.execute(
"""INSERT INTO order_plans
(symbol, symbol_name, market_code, sina_code, direction,
zone_upper, zone_lower, stop_loss, take_profit, plan_date, decision_reason)
VALUES (?,?,?,?,?,?,?,?,?,?,?)""",
(
symbol, symbol_name, market_code, sina_code, direction,
float(d["zone_upper"]), float(d["zone_lower"]),
float(d["stop_loss"]), float(d["take_profit"]),
today_str(),
d.get("decision_reason", "").strip(),
),
)
conn.commit()
conn.close()
flash("开单计划已添加")
return redirect(url_for("plans"))
@app.route("/del_plan/<int:pid>")
@login_required
def del_plan(pid):
conn = get_db()
conn.execute("DELETE FROM order_plans WHERE id=?", (pid,))
conn.commit()
conn.close()
flash("已删除")
return redirect(url_for("plans"))
@app.route("/keys")
@login_required
def keys():
conn = get_db()
key_list = conn.execute(
"SELECT * FROM key_monitors WHERE status='active' OR status IS NULL ORDER BY id DESC"
).fetchall()
history = conn.execute(
"SELECT * FROM key_monitors WHERE status='archived' ORDER BY archived_at DESC LIMIT 100"
).fetchall()
conn.close()
return render_template("keys.html", keys=key_list, history=history)
@app.route("/add_key", methods=["POST"])
@login_required
def add_key():
d = request.form
direction = d.get("direction")
symbol = d.get("symbol", "").strip()
symbol_name = d.get("symbol_name", "").strip()
market_code = d.get("market_code", "").strip()
sina_code = d.get("sina_code", "").strip()
if not direction:
flash("请选择多空方向")
return redirect(url_for("keys"))
if not symbol or not market_code:
flash("请从下拉列表选择品种(同花顺合约代码)")
return redirect(url_for("keys"))
conn = get_db()
conn.execute(
"""INSERT INTO key_monitors
(symbol, symbol_name, market_code, sina_code, monitor_type, direction, upper, lower)
VALUES (?,?,?,?,?,?,?,?)""",
(symbol, symbol_name, market_code, sina_code, d["type"], direction, float(d["upper"]), float(d["lower"])),
)
conn.commit()
conn.close()
flash("关键位监控已添加")
return redirect(url_for("keys"))
@app.route("/add_position", methods=["POST"])
@login_required
def add_position():
flash("持仓由策略交易或 CTP 自动同步,无需手工录入")
return redirect(url_for("positions"))
@app.route("/del_position/<int:pid>")
@login_required
def del_position(pid):
return close_position(pid)
@app.route("/close_position/<int:pid>", methods=["POST"])
@login_required
def close_position(pid):
conn = get_db()
row = conn.execute("SELECT * FROM position_monitors WHERE id=?", (pid,)).fetchone()
if not row:
conn.close()
flash("持仓不存在")
return redirect(url_for("positions"))
sym = row["symbol"]
market = row["market_code"] or ""
sina = row["sina_code"] or ""
direction = row["direction"]
entry = float(row["entry_price"])
sl = float(row["stop_loss"])
tp = float(row["take_profit"])
lots = float(row["lots"] or 1)
open_time = row["open_time"] or ""
close_time = datetime.now(TZ).strftime("%Y-%m-%dT%H:%M")
close_price = fetch_price(sym, market, sina)
if close_price is None:
conn.close()
flash("无法获取现价,平仓失败")
return redirect(url_for("positions"))
capital = float(get_setting("live_capital", "0") or 0)
metrics = calc_position_metrics(direction, entry, sl, tp, lots, close_price, capital, sym)
pnl = metrics.get("float_pnl") or 0.0
fee = calc_round_trip_fee(sym, entry, close_price, lots, open_time, close_time, trading_mode=_trading_mode())
pnl_net = round(pnl - fee, 2)
result = classify_close_result(direction, close_price, sl, tp)
minutes = holding_to_minutes(open_time, close_time)
margin_pct = metrics.get("position_pct")
from trade_log_lib import calc_equity_after
equity_after = calc_equity_after(capital, pnl_net)
conn.execute(
"""INSERT INTO trade_logs
(symbol, symbol_name, market_code, sina_code, monitor_type, direction,
entry_price, stop_loss, take_profit, close_price, lots, margin,
margin_pct, holding_minutes, open_time, close_time, pnl, fee, pnl_net,
equity_after, result)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
sym, row["symbol_name"], market, sina, "持仓监控", direction,
entry, sl, tp, close_price, lots, metrics["margin"],
margin_pct,
minutes, open_time, close_time, pnl, fee, pnl_net, equity_after, result,
),
)
conn.execute("DELETE FROM position_monitors WHERE id=?", (pid,))
conn.commit()
conn.close()
touch_stats_cache()
flash(f"已平仓,盈亏 {pnl:.2f} 元(扣费后 {pnl_net:.2f} 元),已记入交易记录")
return redirect(url_for("positions"))
@app.route("/trades")
@login_required
def trades():
return redirect(url_for("records"))
@app.route("/update_trade/<int:tid>", methods=["POST"])
@login_required
def update_trade(tid):
d = request.form
conn = get_db()
conn.execute(
"""UPDATE trade_logs SET
symbol_name=?, monitor_type=?, direction=?,
entry_price=?, stop_loss=?, take_profit=?, close_price=?,
lots=?, margin=?, holding_minutes=?, open_time=?, close_time=?,
pnl=?, result=?, verified=1
WHERE id=?""",
(
d.get("symbol_name", "").strip(),
d.get("monitor_type", "").strip(),
d.get("direction", "").strip(),
float(d.get("entry_price") or 0),
float(d.get("stop_loss") or 0),
float(d.get("take_profit") or 0),
float(d.get("close_price") or 0),
float(d.get("lots") or 0),
float(d.get("margin") or 0),
int(d.get("holding_minutes") or 0),
d.get("open_time", "").strip(),
d.get("close_time", "").strip(),
float(d.get("pnl") or 0),
d.get("result", "").strip(),
tid,
),
)
conn.commit()
conn.close()
touch_stats_cache()
flash("交易记录已核对保存")
return redirect(url_for("records"))
@app.route("/del_trade/<int:tid>")
@login_required
def del_trade(tid):
conn = get_db()
conn.execute("DELETE FROM trade_logs WHERE id=?", (tid,))
conn.commit()
conn.close()
touch_stats_cache()
flash("已删除")
return redirect(url_for("records"))
@app.route("/fill_review/<int:tid>")
@login_required
def fill_review_from_trade(tid):
conn = get_db()
row = conn.execute("SELECT * FROM trade_logs WHERE id=?", (tid,)).fetchone()
conn.close()
if not row:
flash("记录不存在")
return redirect(url_for("records"))
q = {
"symbol": row["symbol"],
"symbol_name": row["symbol_name"] or row["symbol"],
"market_code": row["market_code"] or "",
"sina_code": row["sina_code"] or "",
"direction": row["direction"],
"entry_price": row["entry_price"],
"stop_loss": row["stop_loss"],
"take_profit": row["take_profit"],
"close_price": row["close_price"],
"lots": row["lots"],
"open_time": row["open_time"],
"close_time": row["close_time"],
"pnl": row["pnl"],
}
params = {k: v for k, v in q.items() if v is not None}
return redirect(url_for("records", **params) + "#review-panel")
@app.route("/del_key/<int:pid>")
@login_required
def del_key(pid):
conn = get_db()
conn.execute(
"UPDATE key_monitors SET status='archived', archived_at=? WHERE id=?",
(datetime.now(TZ).isoformat(), pid),
)
conn.commit()
conn.close()
flash("已移入监控历史")
return redirect(url_for("keys"))
@app.route("/records")
@login_required
def records():
preset = request.args.get("preset", "")
start = request.args.get("start", "")
end = request.args.get("end", "")
if preset:
start, end = parse_review_date_filter(preset, start, end)
conn = get_db()
ctp_sync_info = None
try:
from ctp_trade_sync import sync_trade_logs_from_ctp
from trading_context import get_account_capital, get_trading_mode
from vnpy_bridge import ctp_status
mode = get_trading_mode(get_setting)
if ctp_status(mode).get("connected"):
capital = get_account_capital(conn, get_setting)
ctp_sync_info = sync_trade_logs_from_ctp(
conn, mode, capital=capital, trading_mode=mode,
)
conn.commit()
except Exception as exc:
app.logger.warning("ctp trade sync on records page: %s", exc)
sql = "SELECT * FROM review_records WHERE 1=1"
params: list = []
if start:
sql += " AND date(close_time) >= ?"
params.append(start)
if end:
sql += " AND date(close_time) <= ?"
params.append(end)
sql += " ORDER BY id DESC LIMIT 200"
review_list = conn.execute(sql, params).fetchall()
auto_list = conn.execute(
"SELECT * FROM trade_records ORDER BY id DESC LIMIT 30"
).fetchall()
trade_list = conn.execute(
"SELECT * FROM trade_logs ORDER BY id DESC LIMIT 500"
).fetchall()
from trade_log_lib import enrich_trades_for_records
try:
initial_capital = float(get_setting("live_capital", "0") or 0)
except (TypeError, ValueError):
initial_capital = 0.0
trades, equity_curve = enrich_trades_for_records(
[dict(r) for r in trade_list],
initial_capital=initial_capital,
)
conn.close()
trade_prefill_keys = (
"symbol", "symbol_name", "market_code", "sina_code", "direction",
"entry_price", "stop_loss", "take_profit", "close_price",
"lots", "open_time", "close_time", "pnl",
)
prefill = {k: request.args.get(k) for k in trade_prefill_keys if request.args.get(k)}
return render_template(
"records.html",
reviews=review_list,
trades=trades,
equity_curve=equity_curve,
auto_records=auto_list,
ctp_sync_info=ctp_sync_info,
preset=preset,
start=start,
end=end,
prefill=prefill,
open_types=OPEN_TYPES,
exit_triggers=EXIT_TRIGGERS,
behavior_tags=BEHAVIOR_TAGS,
kline_periods=KLINE_PERIODS,
kline_cutoffs=KLINE_CUTOFFS,
)
@app.route("/add_review", methods=["POST"])
@login_required
def add_review():
d = request.form
open_type = d.get("open_type", "").strip()
exit_trigger = d.get("exit_trigger", "").strip()
if not open_type:
flash("请选择开仓类型")
return redirect(url_for("records"))
if not exit_trigger:
flash("请选择离场触发")
return redirect(url_for("records"))
symbol = d.get("symbol", "").strip()
symbol_name = d.get("symbol_name", "").strip()
market_code = d.get("market_code", "").strip()
sina_code = d.get("sina_code", "").strip()
if not symbol or not market_code:
flash("请从下拉列表选择品种(同花顺合约代码)")
return redirect(url_for("records"))
screenshot = ""
f = request.files.get("screenshot")
if f and f.filename:
fname = secure_filename(f.filename)
ts = datetime.now(TZ).strftime("%Y%m%d%H%M%S")
screenshot = f"{ts}_{fname}"
f.save(os.path.join(UPLOAD_DIR, screenshot))
tags = [t for t in BEHAVIOR_TAGS if d.get(f"tag_{t}")]
is_emotion = 1 if tags else 0
def num(key: str) -> Optional[float]:
v = d.get(key, "").strip()
if not v:
return None
return float(v)
open_time = d.get("open_time", "").strip()
close_time = d.get("close_time", "").strip()
direction = d.get("direction", "").strip()
entry_price = num("entry_price")
stop_loss = num("stop_loss")
take_profit = num("take_profit")
close_price = num("close_price")
lots = num("lots") or 1.0
holding = calc_holding_duration(open_time, close_time)
initial_pnl = calc_rr_ratio(direction, entry_price, stop_loss, take_profit)
actual_pnl = calc_rr_ratio(direction, entry_price, stop_loss, close_price)
gross_pnl = num("pnl")
if gross_pnl is None and entry_price and close_price:
spec_mult = calc_position_metrics(
direction, entry_price, stop_loss, take_profit,
lots, close_price, 0, symbol,
)
gross_pnl = spec_mult.get("float_pnl")
fee = calc_round_trip_fee(
symbol, entry_price or 0, close_price or 0, lots, open_time, close_time,
trading_mode=_trading_mode(),
)
pnl_net = round((gross_pnl or 0) - fee, 2) if gross_pnl is not None else None
auto_kline = bool(d.get("auto_kline"))
if auto_kline and not screenshot:
try:
generated = generate_review_kline_chart(
symbol=symbol,
periods=[d.get("kline_period1", "15m"), d.get("kline_period2", "1h")],
count=int(d.get("kline_count") or 300),
cutoff_label=d.get("kline_cutoff", "平仓时间"),
open_time=open_time,
close_time=close_time,
entry_price=entry_price,
stop_loss=stop_loss,
take_profit=take_profit,
close_price=close_price,
upload_dir=UPLOAD_DIR,
)
if generated:
screenshot = generated
except Exception as exc:
app.logger.warning("auto kline failed: %s", exc)
conn = get_db()
conn.execute(
"""INSERT INTO review_records
(open_time, close_time, symbol, symbol_name, market_code, sina_code,
timeframe, direction,
entry_price, stop_loss, take_profit, close_price, lots,
holding_duration, initial_pnl, actual_pnl, pnl, fee, pnl_net,
open_type, expected_rr, actual_rr, exit_trigger, exit_supplement,
watch_after_breakeven, new_position_while_occupied, screenshot,
auto_kline, kline_period1, kline_period2, kline_count, kline_cutoff,
behavior_tags, is_emotion, notes)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
open_time, close_time,
symbol, symbol_name, market_code, sina_code,
d.get("timeframe", "").strip(),
direction,
entry_price, stop_loss, take_profit, close_price, lots,
holding, initial_pnl, actual_pnl, gross_pnl, fee, pnl_net,
open_type,
None,
None,
exit_trigger,
d.get("exit_supplement", "").strip(),
d.get("watch_after_breakeven", ""),
d.get("new_position_while_occupied", ""),
screenshot,
1 if auto_kline else 0,
d.get("kline_period1", "15m"),
d.get("kline_period2", "1h"),
int(d.get("kline_count") or 300),
d.get("kline_cutoff", "平仓时间"),
",".join(tags),
is_emotion,
d.get("notes", "").strip(),
),
)
hook = getattr(app, "_risk_review_hook", None)
if hook:
hook(
conn,
",".join(tags),
exit_trigger,
d.get("exit_supplement", "").strip(),
)
conn.commit()
conn.close()
touch_stats_cache()
flash("复盘记录已保存")
return redirect(url_for("records"))
@app.route("/del_review/<int:rid>")
@login_required
def del_review(rid):
conn = get_db()
row = conn.execute("SELECT screenshot FROM review_records WHERE id=?", (rid,)).fetchone()
if row and row["screenshot"]:
path = os.path.join(UPLOAD_DIR, row["screenshot"])
if os.path.isfile(path):
os.remove(path)
conn.execute("DELETE FROM review_records WHERE id=?", (rid,))
conn.commit()
conn.close()
touch_stats_cache()
flash("已删除")
return redirect(url_for("records"))
@app.route("/uploads/<path:filename>")
@login_required
def uploaded_file(filename):
from flask import send_from_directory
return send_from_directory(UPLOAD_DIR, filename)
@app.route("/del_record/<int:rid>")
@login_required
def del_record(rid):
conn = get_db()
conn.execute("DELETE FROM trade_records WHERE id=?", (rid,))
conn.commit()
conn.close()
flash("已删除")
return redirect(url_for("records"))
@app.route("/stats")
@login_required
def stats():
return render_template("stats.html")
@app.route("/api/stats")
@login_required
def api_stats():
return jsonify(get_stats_data())
@app.route("/api/stats/views")
@login_required
def api_stats_views():
return jsonify({"views": STATS_VIEWS})
@app.route("/api/stats/refresh", methods=["POST"])
@login_required
def api_stats_refresh():
conn = get_db()
capital = float(get_setting("live_capital", "0") or 0)
data = refresh_stats_cache(conn, capital)
conn.close()
return jsonify(data)
@app.route("/market")
@login_required
@require_nav("market")
def market_page():
symbol = request.args.get("symbol", "").strip()
period = request.args.get("period", "15m").strip()
valid = {p["key"] for p in MARKET_PERIODS}
if period not in valid:
period = "15m"
ctp_st = {}
try:
from vnpy_bridge import ctp_status
from trading_context import get_trading_mode
ctp_st = ctp_status(get_trading_mode(get_setting))
except Exception:
pass
return render_template(
"market.html",
symbol=symbol,
period=period,
market_periods=MARKET_PERIODS,
quote_label=get_quote_source_label(ctp_connected=bool(ctp_st.get("connected"))),
ctp_connected=bool(ctp_st.get("connected")),
)
@app.route("/api/kline")
@login_required
def api_kline():
symbol = request.args.get("symbol", "").strip()
period = request.args.get("period", "15m").strip()
if not symbol:
return jsonify({"error": "请提供合约代码"}), 400
try:
from trading_context import get_trading_mode
data = fetch_market_klines(
symbol, period, DB_PATH, trading_mode=get_trading_mode(get_setting),
)
except Exception as exc:
app.logger.warning("kline api failed: %s", exc)
return jsonify({"error": str(exc)}), 500
if not data.get("chart_symbol"):
return jsonify({"error": "无法识别合约代码"}), 400
if not data.get("bars"):
return jsonify({"error": "未获取到K线数据,请稍后重试或更换合约"}), 404
return jsonify(data)
@app.route("/api/kline/stream")
@login_required
def api_kline_stream():
from queue import Empty
symbol = request.args.get("symbol", "").strip()
period = request.args.get("period", "15m").strip()
market_code = request.args.get("market_code", "").strip()
sina_code = request.args.get("sina_code", "").strip()
if not symbol:
return jsonify({"error": "请提供合约代码"}), 400
def generate():
from trading_context import get_trading_mode
mode = get_trading_mode(get_setting)
sub = kline_hub.subscribe(symbol, period, market_code, sina_code)
try:
kline_data = fetch_market_klines(
symbol, period, DB_PATH, trading_mode=mode,
)
if kline_data.get("bars"):
yield sse_format("kline", kline_data)
yield sse_format(
"quote",
build_market_quote_payload(symbol, market_code, sina_code),
)
while True:
try:
msg = sub.queue.get(timeout=20)
yield sse_format(msg["event"], msg["data"])
except Empty:
yield ": heartbeat\n\n"
finally:
kline_hub.unsubscribe(sub)
return Response(
stream_with_context(generate()),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
@app.route("/api/market_quote")
@login_required
def api_market_quote():
symbol = request.args.get("symbol", "").strip()
market_code = request.args.get("market_code", "").strip()
sina_code = request.args.get("sina_code", "").strip()
if not symbol and not market_code:
return jsonify({"error": "请提供合约"}), 400
return jsonify(build_market_quote_payload(symbol, market_code, sina_code))
@app.route("/contract")
@login_required
@require_nav("contract")
def contract_profile_page():
symbol = request.args.get("symbol", "").strip()
profile = None
error = None
if symbol:
try:
profile = get_contract_profile(symbol)
if not profile:
error = "未查询到该合约简介,请检查合约代码"
except Exception as exc:
app.logger.warning("contract profile failed: %s", exc)
error = f"查询失败:{exc}"
return render_template(
"contract.html",
symbol=symbol,
profile=profile,
error=error,
)
@app.route("/api/contract_profile")
@login_required
def api_contract_profile():
symbol = request.args.get("symbol", "").strip()
if not symbol:
return jsonify({"error": "请提供合约代码"}), 400
try:
profile = get_contract_profile(symbol)
except Exception as exc:
return jsonify({"error": str(exc)}), 500
if not profile:
return jsonify({"error": "未查询到合约简介"}), 404
return jsonify(profile)
@app.route("/fees", methods=["GET", "POST"])
@login_required
@require_nav("fees")
def fees():
from trading_context import get_trading_mode
from ctp_fee_worker import (
schedule_ctp_fee_sync,
get_fee_last_sync,
fees_synced_today,
fee_sync_in_progress,
)
from vnpy_bridge import ctp_status
mode = get_trading_mode(get_setting)
if request.method == "POST":
action = request.form.get("action")
if action == "sync_ctp":
force = request.form.get("force") == "1"
_, msg = schedule_ctp_fee_sync(
mode,
get_setting=get_setting,
set_setting=set_setting,
force=force,
)
flash(msg)
return redirect(url_for("fees"))
rates = list_fee_rates_for_ui()
fee_counts = count_fee_rates_by_source()
ctp_st = ctp_status(mode)
return render_template(
"fees.html",
rates=rates,
fee_counts=fee_counts,
fee_last_sync=get_fee_last_sync(get_setting),
fee_synced_today=fees_synced_today(get_setting),
fee_sync_running=fee_sync_in_progress(),
ctp_connected=bool(ctp_st.get("connected")),
)
@app.route("/settings", methods=["GET", "POST"])
@login_required
def settings():
if request.method == "POST":
action = request.form.get("action")
if action == "wechat":
webhook = request.form.get("wechat_webhook", "").strip()
set_setting("wechat_webhook", webhook)
flash("企业微信配置已保存")
elif action == "trading":
mode = request.form.get("trading_mode", "simulation").strip()
if mode not in ("simulation", "live"):
mode = "simulation"
sizing = request.form.get("position_sizing_mode", "fixed").strip()
if sizing == "risk":
sizing = "amount"
if sizing not in ("fixed", "amount"):
sizing = "fixed"
set_setting("trading_mode", mode)
set_setting("position_sizing_mode", sizing)
try:
fl = int(float(request.form.get("fixed_lots", "1") or 1))
set_setting("fixed_lots", str(max(1, fl)))
except ValueError:
flash("固定手数无效")
return redirect(url_for("settings"))
try:
fa = float(request.form.get("fixed_amount", "5000") or 5000)
set_setting("fixed_amount", str(max(1.0, fa)))
except ValueError:
flash("固定金额无效")
return redirect(url_for("settings"))
try:
rp = float(request.form.get("risk_percent", "1") or 1)
set_setting("risk_percent", str(max(0.1, min(100.0, rp))))
except ValueError:
pass
try:
mp = float(request.form.get("max_margin_pct", "30") or 30)
set_setting("max_margin_pct", str(max(1.0, min(100.0, mp))))
except ValueError:
flash("保证金比例无效")
return redirect(url_for("settings"))
try:
tb = int(float(request.form.get("trailing_be_tick_buffer", "2") or 2))
set_setting("trailing_be_tick_buffer", str(max(1, min(20, tb))))
except ValueError:
flash("移动保本缓冲无效")
return redirect(url_for("settings"))
try:
pt = int(float(request.form.get("pending_order_timeout_min", "5") or 5))
set_setting("pending_order_timeout_min", str(max(1, min(60, pt))))
except ValueError:
flash("挂单超时无效")
return redirect(url_for("settings"))
flash("交易模式已保存")
elif action == "ctp":
from ctp_settings import save_ctp_settings_from_form
save_result = save_ctp_settings_from_form(request.form, set_setting)
pwd_updated = save_result.get("passwords_updated") or []
pwd_empty = save_result.get("passwords_submitted_empty") or []
simnow_pwd_len = len((request.form.get("simnow_password") or "").strip())
live_pwd_len = len((request.form.get("ctp_live_password") or "").strip())
print(
f"CTP settings save: simnow_password_len={simnow_pwd_len} "
f"live_password_len={live_pwd_len} updated={pwd_updated}",
flush=True,
)
app.logger.info(
"CTP settings save: simnow_password_len=%s live_password_len=%s updated=%s",
simnow_pwd_len,
live_pwd_len,
pwd_updated,
)
if "simnow_password" in pwd_updated:
pwd_note = f"SimNow 交易密码已更新({simnow_pwd_len} 位)"
elif "simnow_password" in pwd_empty:
pwd_note = "SimNow 交易密码未改:提交为空,请在「交易密码」框手打后再保存"
elif "ctp_live_password" in pwd_updated:
pwd_note = "实盘交易密码已更新"
elif "ctp_live_password" in pwd_empty:
pwd_note = "实盘交易密码未改(提交为空)"
else:
pwd_note = ""
flash_msg = "CTP 配置已保存,正在使用新地址重连…"
if pwd_note:
flash_msg = f"CTP 配置已保存;{pwd_note},正在重连…"
try:
from vnpy_bridge import get_bridge
from trading_context import get_trading_mode
b = get_bridge()
if pwd_updated:
b._clear_login_cooldown()
mode = get_trading_mode(get_setting)
info = b.reconnect_after_settings_saved(mode)
if info.get("cooldown"):
flash_msg = f"CTP 配置已保存;{pwd_note or '请稍后再连'}"
elif not info.get("started") and info.get("connected"):
flash_msg = f"CTP 配置已保存;{pwd_note or '当前连接正常'}"
except Exception as exc:
app.logger.warning("CTP reconnect after settings save: %s", exc)
flash_msg = f"CTP 配置已保存;{pwd_note or '请稍后在持仓监控页重连'}"
flash(flash_msg)
elif action == "nav":
items = {k: request.form.get(f"nav_{k}") == "on" for k in NAV_TOGGLES}
save_nav_items(set_setting, items)
flash("导航显示已保存")
elif action == "password":
old_p = request.form.get("old_password", "")
new_p = request.form.get("new_password", "")
new_p2 = request.form.get("new_password2", "")
admin_hash = get_setting("admin_password_hash")
if not check_password_hash(admin_hash, old_p):
flash("原密码错误")
elif len(new_p) < 6:
flash("新密码至少 6 位")
elif new_p != new_p2:
flash("两次新密码不一致")
else:
set_setting("admin_password_hash", generate_password_hash(new_p))
flash("密码修改成功")
return redirect(url_for("settings"))
webhook = get_setting("wechat_webhook")
username = get_setting("admin_username")
ctp_st = {}
try:
from vnpy_bridge import ctp_status
from trading_context import get_trading_mode
ctp_st = ctp_status(get_trading_mode(get_setting))
except Exception:
pass
from ctp_settings import get_ctp_settings_for_ui
return render_template(
"settings.html",
webhook=webhook,
username=username,
quote_label=get_quote_source_label(ctp_connected=bool(ctp_st.get("connected"))),
ctp_status=ctp_st,
ctp_cfg=get_ctp_settings_for_ui(),
trading_mode=get_setting("trading_mode", "simulation"),
position_sizing_mode=get_setting("position_sizing_mode", "fixed"),
fixed_lots=get_setting("fixed_lots", "1"),
fixed_amount=get_setting("fixed_amount", "5000"),
risk_percent=get_setting("risk_percent", "1"),
max_margin_pct=get_setting("max_margin_pct", "30"),
trailing_be_tick_buffer=get_setting("trailing_be_tick_buffer", "2"),
pending_order_timeout_min=get_setting("pending_order_timeout_min", "5"),
nav_items=get_nav_items(get_setting),
nav_toggles=NAV_TOGGLES,
)
install_trading(
app,
login_required=login_required,
require_nav=require_nav,
get_db=get_db,
get_setting=get_setting,
set_setting=set_setting,
fetch_price=fetch_price,
send_wechat_msg=send_wechat_msg,
)
try_init_vnpy({})
start_background_threads()
# —————————————— 启动 ——————————————
if __name__ == "__main__":
app.run(host=HOST, port=PORT, debug=DEBUG, threaded=True)