# Copyright (c) 2025-2026 马建军. All rights reserved. # 专有软件 — 未经授权禁止复制、传播、转售。 # 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。 # 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md import os from locale_fix import ensure_process_locale ensure_process_locale() import sqlite3 import time import threading import requests from datetime import date, datetime, timedelta from typing import Optional from functools import wraps from zoneinfo import ZoneInfo from werkzeug.utils import secure_filename from dotenv import load_dotenv from flask import ( Flask, render_template, request, redirect, url_for, flash, session, jsonify, Response, stream_with_context, ) from werkzeug.security import check_password_hash, generate_password_hash from functools import wraps from symbols import ( search_symbols, ths_to_codes, list_main_contracts_grouped, list_recommended_symbols_grouped, refresh_main_index, ) from contract_specs import calc_position_metrics from fee_specs import ( calc_fee_breakdown, calc_round_trip_fee, list_fee_rates_for_ui, count_fee_rates_by_source, purge_non_ctp_fee_rates, ) from nav_settings import NAV_TOGGLES, get_nav_items, nav_enabled, save_nav_items from stats_engine import ( STATS_VIEWS, build_all_stats, get_calendar_day, get_calendar_month, load_stats_cache, refresh_stats_cache, ) from kline_store import ensure_kline_tables from kline_stream import kline_hub, sse_format from kline_chart import generate_review_kline_chart, fetch_market_klines, MARKET_PERIODS from market import get_price as market_get_price, set_ths_refresh_token, get_quote_source_label from db_conn import connect_db from admin_settings import save_admin_credentials from db_backup import ( backup_dir, backup_in_progress, default_restore_dir, get_backup_last_at, list_backups, resolve_backup_file, schedule_backup, start_backup_worker, ) from strategy.strategy_db import init_strategy_tables from install_trading import install_trading from vnpy_bridge import try_init_vnpy load_dotenv(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env")) app = Flask(__name__) app.secret_key = os.getenv("SECRET_KEY", "futures_monitor_default_secret") HOST = os.getenv("HOST", "0.0.0.0") PORT = int(os.getenv("PORT", "6600")) DEBUG = os.getenv("DEBUG", "false").lower() in ("1", "true", "yes") DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "futures.db") UPLOAD_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "uploads") TZ = ZoneInfo("Asia/Shanghai") OPEN_TYPES = ["突破开仓", "回调开仓", "追涨杀跌", "计划内开仓", "震荡摸顶底", "其他"] EXIT_TRIGGERS = ["止盈", "止损", "手工平仓", "移动止损", "时间离场", "其他"] BEHAVIOR_TAGS = ["怕踏空", "报复开仓", "盈利飘了", "拿不住单", "扛单", "重仓违规"] KLINE_PERIODS = ["1m", "3m", "5m", "15m", "30m", "1h", "4h", "1d"] KLINE_CUTOFFS = ["平仓时间", "开仓时间", "当前时间"] def today_str() -> str: return datetime.now(TZ).date().isoformat() def calc_holding_duration(open_time: str, close_time: str) -> str: try: o = datetime.fromisoformat(open_time.strip().replace(" ", "T")[:19]) c = datetime.fromisoformat(close_time.strip().replace(" ", "T")[:19]) delta = c - o if delta.total_seconds() < 0: return "" secs = int(delta.total_seconds()) h, rem = divmod(secs, 3600) m, _ = divmod(rem, 60) if h: return f"{h}小时{m}分钟" return f"{m}分钟" except Exception: return "" def holding_to_minutes(open_time: str, close_time: str) -> int: try: o = datetime.fromisoformat(open_time.strip().replace(" ", "T")) c = datetime.fromisoformat(close_time.strip().replace(" ", "T")) secs = int((c - o).total_seconds()) return max(0, secs // 60) except Exception: return 0 def classify_close_result(direction: str, close: float, sl: float, tp: float) -> str: """根据平仓价与止损/止盈距离判断结果。""" if close is None: return "手动平仓" tol = max(abs(close) * 0.002, 1.0) if abs(close - tp) <= tol: return "止盈" if abs(close - sl) <= tol: return "止损" return "手动平仓" def calc_rr_ratio(direction: str, entry: float, stop: float, target: float) -> Optional[float]: """盈亏比 = 盈利空间 / 风险空间。""" if entry is None or stop is None or target is None: return None if direction == "long": risk = entry - stop if risk <= 0: return None return round((target - entry) / risk, 2) if direction == "short": risk = stop - entry if risk <= 0: return None return round((entry - target) / risk, 2) return None def calc_theoretical_pnl(direction: str, entry: float, target: float, lots: float) -> Optional[float]: if entry is None or target is None or lots is None: return None if direction == "long": return round((target - entry) * lots, 2) if direction == "short": return round((entry - target) * lots, 2) return None def parse_review_date_filter(preset: str, start: str, end: str) -> tuple[str, str]: today = datetime.now(TZ).date() if preset == "today": s = today.isoformat() return s, s if preset == "week": monday = today - timedelta(days=today.weekday()) return monday.isoformat(), today.isoformat() if preset == "month": return today.replace(day=1).isoformat(), today.isoformat() return start.strip(), end.strip() def expire_old_plans(): """当日结束后计划自动失效,保留历史。""" today = today_str() conn = get_db() conn.execute( "UPDATE order_plans SET status='expired' WHERE plan_date < ? AND status IN ('planned', 'active')", (today,), ) conn.execute( "UPDATE order_plans SET plan_date=date(created_at) WHERE plan_date IS NULL OR plan_date=''" ) conn.commit() conn.close() def get_db(): return connect_db() def get_setting(key: str, default: str = "") -> str: conn = get_db() row = conn.execute("SELECT value FROM settings WHERE key=?", (key,)).fetchone() conn.close() return row["value"] if row else default def set_setting(key: str, value: str): conn = get_db() conn.execute( "INSERT INTO settings (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=?", (key, value, value), ) conn.commit() conn.close() def require_nav(key: str): """导航项关闭时拒绝访问对应页面。""" def decorator(f): @wraps(f) def wrapped(*args, **kwargs): if not nav_enabled(get_setting, key): flash("该页面已在系统设置中关闭") return redirect(url_for("positions")) return f(*args, **kwargs) return wrapped return decorator def _static_asset_v() -> str: base = os.path.dirname(os.path.abspath(__file__)) rels = ( "static/js/trade.js", "static/js/dashboard.js", "static/js/orientation.js", "static/css/records.css", "static/js/records.js", "static/js/settings.js", "static/css/mobile.css", "static/css/responsive.css", "static/css/trade.css", "static/css/dashboard.css", "static/css/doc.css", "static/css/base.css", ) mtimes = [] for rel in rels: path = os.path.join(base, rel.replace("/", os.sep)) if os.path.isfile(path): mtimes.append(os.path.getmtime(path)) return str(int(max(mtimes))) if mtimes else "0" def _ua_is_phone(ua: str) -> bool: ua_l = (ua or "").lower() if "ipad" in ua_l: return False if "android" in ua_l and "mobile" not in ua_l: return False if any(x in ua_l for x in ("iphone", "ipod", "windows phone", "iemobile")): return True if "android" in ua_l and "mobile" in ua_l: return True if "mobile" in ua_l or "harmonyos" in ua_l or "openharmony" in ua_l: return True return False @app.context_processor def inject_globals(): return {"nav_items": get_nav_items(get_setting), "asset_v": _static_asset_v()} def _trading_mode() -> str: return (get_setting("trading_mode", "simulation") or "simulation").strip() def touch_stats_cache(): try: conn = get_db() capital = float(get_setting("live_capital", "0") or 0) refresh_stats_cache(conn, capital) conn.close() except Exception as exc: app.logger.warning("stats cache refresh failed: %s", exc) def get_stats_data() -> dict: conn = get_db() try: capital = float(get_setting("live_capital", "0") or 0) data = load_stats_cache(conn) if data: return data try: return refresh_stats_cache(conn, capital) except sqlite3.OperationalError as exc: if "locked" not in str(exc).lower(): raise app.logger.warning("stats cache refresh locked, compute without save: %s", exc) return build_all_stats(conn, capital) finally: conn.close() def init_db(): conn = get_db() c = conn.cursor() c.execute("CREATE TABLE IF NOT EXISTS settings (key TEXT PRIMARY KEY, value TEXT)") c.execute('''CREATE TABLE IF NOT EXISTS order_plans (id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT, symbol_name TEXT, direction TEXT, zone_upper REAL, zone_lower REAL, stop_loss REAL, take_profit REAL, status TEXT DEFAULT "planned", triggered_at TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''') c.execute('''CREATE TABLE IF NOT EXISTS key_monitors (id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT, symbol_name TEXT, monitor_type TEXT, direction TEXT, upper REAL, lower REAL, upper_triggered INTEGER DEFAULT 0, lower_triggered INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''') c.execute('''CREATE TABLE IF NOT EXISTS trade_records (id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT, symbol_name TEXT, monitor_type TEXT, direction TEXT, trigger_price REAL, stop_loss REAL, take_profit REAL, result TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''') migrations = [ "ALTER TABLE key_monitors ADD COLUMN symbol_name TEXT", "ALTER TABLE key_monitors ADD COLUMN upper_triggered INTEGER DEFAULT 0", "ALTER TABLE key_monitors ADD COLUMN lower_triggered INTEGER DEFAULT 0", "ALTER TABLE trade_records ADD COLUMN symbol_name TEXT", "ALTER TABLE order_plans ADD COLUMN sina_code TEXT", "ALTER TABLE order_plans ADD COLUMN market_code TEXT", "ALTER TABLE key_monitors ADD COLUMN market_code TEXT", "ALTER TABLE key_monitors ADD COLUMN sina_code TEXT", "ALTER TABLE trade_records ADD COLUMN market_code TEXT", "ALTER TABLE order_plans ADD COLUMN plan_date TEXT", "ALTER TABLE order_plans ADD COLUMN decision_reason TEXT", "ALTER TABLE key_monitors ADD COLUMN status TEXT DEFAULT 'active'", "ALTER TABLE key_monitors ADD COLUMN archived_at TEXT", "ALTER TABLE key_monitors ADD COLUMN trade_mode TEXT DEFAULT '顺势'", "ALTER TABLE key_monitors ADD COLUMN risk_reward REAL DEFAULT 2", "ALTER TABLE key_monitors ADD COLUMN trailing_be INTEGER DEFAULT 0", "ALTER TABLE key_monitors ADD COLUMN last_trigger_bar TEXT", "ALTER TABLE key_monitors ADD COLUMN alert_push_count INTEGER DEFAULT 0", "ALTER TABLE key_monitors ADD COLUMN alert_last_push_at TEXT", "ALTER TABLE key_monitors ADD COLUMN alert_break_side TEXT", "ALTER TABLE key_monitors ADD COLUMN breakout_bar_time TEXT", "ALTER TABLE key_monitors ADD COLUMN alert_close_price REAL", "ALTER TABLE key_monitors ADD COLUMN bar_period TEXT DEFAULT '5m'", "ALTER TABLE review_records ADD COLUMN direction TEXT", "ALTER TABLE review_records ADD COLUMN entry_price REAL", "ALTER TABLE review_records ADD COLUMN stop_loss REAL", "ALTER TABLE review_records ADD COLUMN take_profit REAL", "ALTER TABLE review_records ADD COLUMN close_price REAL", "ALTER TABLE review_records ADD COLUMN lots REAL", "ALTER TABLE review_records ADD COLUMN holding_duration TEXT", "ALTER TABLE review_records ADD COLUMN initial_pnl REAL", "ALTER TABLE review_records ADD COLUMN actual_pnl REAL", "ALTER TABLE review_records ADD COLUMN is_emotion INTEGER DEFAULT 0", "ALTER TABLE review_records ADD COLUMN symbol_name TEXT", "ALTER TABLE review_records ADD COLUMN market_code TEXT", "ALTER TABLE review_records ADD COLUMN sina_code TEXT", "ALTER TABLE trade_logs ADD COLUMN fee REAL", "ALTER TABLE trade_logs ADD COLUMN pnl_net REAL", "ALTER TABLE trade_logs ADD COLUMN margin_pct REAL", "ALTER TABLE trade_logs ADD COLUMN equity_after REAL", "ALTER TABLE review_records ADD COLUMN fee REAL", "ALTER TABLE review_records ADD COLUMN pnl_net REAL", ] for sql in migrations: try: c.execute(sql) except sqlite3.OperationalError: pass c.execute('''CREATE TABLE IF NOT EXISTS review_records (id INTEGER PRIMARY KEY AUTOINCREMENT, open_time TEXT, close_time TEXT, symbol TEXT, timeframe TEXT, pnl REAL, open_type TEXT, expected_rr REAL, actual_rr REAL, exit_trigger TEXT, exit_supplement TEXT, watch_after_breakeven TEXT, new_position_while_occupied TEXT, screenshot TEXT, auto_kline INTEGER DEFAULT 0, kline_period1 TEXT, kline_period2 TEXT, kline_count INTEGER, kline_cutoff TEXT, behavior_tags TEXT, notes TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''') c.execute('''CREATE TABLE IF NOT EXISTS position_monitors (id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT, symbol_name TEXT, market_code TEXT, sina_code TEXT, direction TEXT, lots REAL, entry_price REAL, stop_loss REAL, take_profit REAL, open_time TEXT, status TEXT DEFAULT 'active', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''') c.execute('''CREATE TABLE IF NOT EXISTS trade_logs (id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT, symbol_name TEXT, market_code TEXT, sina_code TEXT, monitor_type TEXT, direction TEXT, entry_price REAL, stop_loss REAL, take_profit REAL, close_price REAL, lots REAL, margin REAL, holding_minutes INTEGER, open_time TEXT, close_time TEXT, pnl REAL, result TEXT, verified INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''') c.execute('''CREATE TABLE IF NOT EXISTS fee_rates (product TEXT PRIMARY KEY, exchange TEXT, mult INTEGER, open_fixed REAL DEFAULT 0, open_ratio REAL DEFAULT 0, close_yesterday_fixed REAL DEFAULT 0, close_yesterday_ratio REAL DEFAULT 0, close_today_fixed REAL DEFAULT 0, close_today_ratio REAL DEFAULT 0, updated_at TEXT)''') c.execute('''CREATE TABLE IF NOT EXISTS stats_cache (key TEXT PRIMARY KEY, data_json TEXT NOT NULL, updated_at TEXT NOT NULL)''') for sql in ( "ALTER TABLE fee_rates ADD COLUMN source TEXT DEFAULT 'local'", ): try: c.execute(sql) except sqlite3.OperationalError: pass ensure_kline_tables(conn) init_strategy_tables(conn) from risk.account_risk_lib import ensure_account_risk_schema from recommend_store import ensure_recommend_tables ensure_account_risk_schema(conn) ensure_recommend_tables(conn) conn.commit() conn.close() sync_admin_from_env() if not get_setting("wechat_webhook") and os.getenv("WECHAT_WEBHOOK"): set_setting("wechat_webhook", os.getenv("WECHAT_WEBHOOK")) if not get_setting("ths_refresh_token") and os.getenv("THS_REFRESH_TOKEN"): set_setting("ths_refresh_token", os.getenv("THS_REFRESH_TOKEN")) from ctp_settings import seed_ctp_settings_from_env seed_ctp_settings_from_env(set_setting) os.makedirs(UPLOAD_DIR, exist_ok=True) expire_old_plans() if not get_setting("fee_multiplier"): set_setting("fee_multiplier", "2") if not get_setting("trading_mode"): set_setting("trading_mode", "simulation") if not get_setting("position_sizing_mode"): set_setting("position_sizing_mode", "fixed") if not get_setting("fixed_lots"): set_setting("fixed_lots", "1") if not get_setting("fixed_amount"): set_setting("fixed_amount", "5000") if not get_setting("risk_percent"): set_setting("risk_percent", "1") if not get_setting("max_margin_pct"): set_setting("max_margin_pct", "30") if not get_setting("roll_max_margin_pct"): set_setting("roll_max_margin_pct", "50") if not get_setting("trailing_be_tick_buffer"): set_setting("trailing_be_tick_buffer", "2") if not get_setting("pending_order_timeout_min"): set_setting("pending_order_timeout_min", "5") if not get_setting("ai_enabled"): set_setting("ai_enabled", "0") if not get_setting("ai_provider"): set_setting("ai_provider", "ollama") if not get_setting("ai_ollama_base_url"): set_setting("ai_ollama_base_url", "http://127.0.0.1:11434") if not get_setting("ai_ollama_model"): set_setting("ai_ollama_model", "qwen2.5:7b") if not get_setting("ai_openai_base_url"): set_setting("ai_openai_base_url", "https://api.openai.com/v1") if not get_setting("ai_openai_model"): set_setting("ai_openai_model", "gpt-4o-mini") if not get_setting("ai_daily_report_enabled"): set_setting("ai_daily_report_enabled", "1") if not get_setting("ai_daily_report_hour"): set_setting("ai_daily_report_hour", "15") if not get_setting("ai_daily_report_minute"): set_setting("ai_daily_report_minute", "5") if not get_setting("backup_auto_enabled"): set_setting("backup_auto_enabled", "1") if not get_setting("backup_auto_hour"): set_setting("backup_auto_hour", "3") if not get_setting("backup_keep_count"): set_setting("backup_keep_count", "30") if not get_setting("fee_source_mode"): set_setting("fee_source_mode", "ctp") set_setting("fee_source_mode", "ctp") try: purge_non_ctp_fee_rates() except Exception: pass def sync_admin_from_env(): """ 从 .env 同步管理员账号。 - 首次建库:自动写入 ADMIN_USERNAME / ADMIN_PASSWORD - 已建库后改 .env:需设 ADMIN_SYNC_FROM_ENV=true 并重启服务 """ sync = os.getenv("ADMIN_SYNC_FROM_ENV", "false").lower() in ("1", "true", "yes") env_username = os.getenv("ADMIN_USERNAME", "").strip() env_password = os.getenv("ADMIN_PASSWORD", "").strip() placeholder_passwords = {"", "change-me-on-first-login", "admin123"} if not get_setting("admin_username"): username = env_username or "admin" password = env_password if env_password not in placeholder_passwords else "admin123" set_setting("admin_username", username) set_setting("admin_password_hash", generate_password_hash(password)) return if not sync: return if env_username: set_setting("admin_username", env_username) if env_password and env_password not in placeholder_passwords: set_setting("admin_password_hash", generate_password_hash(env_password)) init_db() def sync_ths_token(): set_ths_refresh_token(get_setting("ths_refresh_token")) sync_ths_token() def build_market_quote_payload( symbol: str, market_code: str = "", sina_code: str = "", *, prefer_sina: bool = False, ) -> dict: if not market_code or not sina_code: codes = ths_to_codes(symbol) if codes: market_code = codes.get("market_code", "") or market_code sina_code = codes.get("sina_code", "") or sina_code quote_source = "sina" price = None prev_close = None if not prefer_sina: try: from vnpy_bridge import ctp_status, ctp_get_tick_detail from trading_context import get_trading_mode mode = get_trading_mode(get_setting) if ctp_status(mode).get("connected"): detail = ctp_get_tick_detail(mode, symbol) if detail.get("price"): price = detail["price"] quote_source = "ctp" if detail.get("pre_close") is not None: prev_close = detail["pre_close"] except Exception: pass if price is None: price = fetch_price(symbol, market_code, sina_code) name = symbol codes = ths_to_codes(symbol) if codes: name = codes.get("name", symbol) if prev_close is None and sina_code: from market import fetch_raw_for_volume raw = fetch_raw_for_volume(sina_code) if raw and raw.get("prev_close") is not None: prev_close = raw["prev_close"] return { "symbol": symbol, "name": name, "price": price, "prev_close": prev_close, "quote_source": quote_source, } # —————————————— 推送 —————————————— def send_wechat_msg(content: str): webhook = get_setting("wechat_webhook") if not webhook: return full = f"【国内期货】\n{content}" data = {"msgtype": "text", "text": {"content": full}} try: requests.post(webhook, json=data, timeout=10) except Exception: pass # —————————————— 行情 —————————————— def resolve_market_codes(ths_code: str, market_code: str = "", sina_code: str = "") -> tuple[str, str]: """返回 (market_code, sina_code) 用于行情拉取。""" if market_code: return market_code, sina_code if sina_code and "." in sina_code: return sina_code, "" codes = ths_to_codes(ths_code) if codes: return codes["market_code"], codes["sina_code"] if ths_code.startswith("nf_") or ths_code.startswith("CFF_RE_"): return ths_code, ths_code return "", sina_code or "" def fetch_price(ths_code: str, market_code: str = "", sina_code: str = "") -> Optional[float]: sym = (ths_code or "").strip() if sym: try: from vnpy_bridge import ctp_status, ctp_get_tick_price from trading_context import get_trading_mode mode = get_trading_mode(get_setting) if ctp_status(mode).get("connected"): p = ctp_get_tick_price(mode, sym) if p and p > 0: return p except Exception: pass mc, sc = resolve_market_codes(sym, market_code, sina_code) if not mc and not sc: return None return market_get_price(mc, sc) # —————————————— 监控逻辑 —————————————— def check_order_plans(): expire_old_plans() today = today_str() conn = get_db() rows = conn.execute( "SELECT * FROM order_plans WHERE plan_date=? AND status IN ('planned', 'active')", (today,), ).fetchall() for r in rows: sym = r["symbol"] sina = r["sina_code"] if "sina_code" in r.keys() else "" market = r["market_code"] if "market_code" in r.keys() else "" p = fetch_price(sym, market, sina) if not p: continue direction = r["direction"] zone_upper = r["zone_upper"] zone_lower = r["zone_lower"] stop_loss = r["stop_loss"] take_profit = r["take_profit"] status = r["status"] pid = r["id"] name = r["symbol_name"] or sym reason = r["decision_reason"] if "decision_reason" in r.keys() and r["decision_reason"] else "—" # 计划状态:价格进入决策区间则激活并通知 if status == "planned": in_zone = zone_lower <= p <= zone_upper if in_zone: msg = ( f"【开单计划触发】{name} ({sym})\n" f"方向:{'做多' if direction == 'long' else '做空'}\n" f"决策区间:{zone_lower} ~ {zone_upper}\n" f"决策理由:{reason}\n" f"当前价:{p}\n" f"止损:{stop_loss} 止盈:{take_profit}" ) send_wechat_msg(msg) conn.execute( "UPDATE order_plans SET status='active', triggered_at=? WHERE id=?", (datetime.now().isoformat(), pid), ) status = "active" # 激活状态:监控止盈止损 if status == "active": res = None if direction == "long": if p >= take_profit: res = "止盈" elif p <= stop_loss: res = "止损" elif direction == "short": if p <= take_profit: res = "止盈" elif p >= stop_loss: res = "止损" if res: msg = ( f"[{'做多' if direction == 'long' else '做空'}] {name} 已{res}\n" f"决策区间:{zone_lower} ~ {zone_upper}\n" f"止损:{stop_loss} 止盈:{take_profit}\n" f"当前价:{p}" ) send_wechat_msg(msg) conn.execute( """INSERT INTO trade_records (symbol, symbol_name, monitor_type, direction, trigger_price, stop_loss, take_profit, result) VALUES (?,?,?,?,?,?,?,?)""", (sym, name, "开单计划", direction, p, stop_loss, take_profit, res), ) conn.execute( "UPDATE order_plans SET status='closed' WHERE id=?", (pid,) ) conn.commit() conn.close() def check_key_monitors(): from db_conn import DB_PATH from key_monitor_lib import run_key_monitor_check from trading_context import get_trading_mode conn = get_db() try: execute_fn = getattr(app, "_execute_key_breakout", None) run_key_monitor_check( conn, db_path=DB_PATH, get_trading_mode_fn=lambda: get_trading_mode(get_setting), send_wechat=send_wechat_msg, execute_breakout_fn=execute_fn, ) conn.commit() finally: conn.close() def background_task(): while True: try: expire_old_plans() check_key_monitors() fn_roll = getattr(app, "_check_roll_monitors", None) if fn_roll: fn_roll() check_order_plans() fn = getattr(app, "_check_trend_plans", None) if fn: fn(app) except Exception: pass time.sleep(3) def start_background_threads(): from trading_context import get_trading_mode threading.Thread(target=background_task, daemon=True).start() threading.Thread( target=lambda: kline_hub.worker_loop( DB_PATH, lambda sym, mc, sc: build_market_quote_payload( sym, mc, sc, prefer_sina=True, ), get_mode_fn=lambda: get_trading_mode(get_setting), ), daemon=True, ).start() threading.Thread(target=refresh_main_index, daemon=True).start() start_backup_worker(get_setting_fn=get_setting, set_setting_fn=set_setting) # —————————————— 登录 —————————————— def login_required(f): @wraps(f) def wrap(*args, **kwargs): if not session.get("logged_in"): return redirect(url_for("login")) return f(*args, **kwargs) return wrap @app.route("/") def index(): if session.get("logged_in"): return redirect(url_for("positions")) return redirect(url_for("login")) @app.route("/manifest.webmanifest") def web_manifest(): import json manifest_path = os.path.join(app.static_folder, "manifest.json") with open(manifest_path, encoding="utf-8") as fh: data = json.load(fh) if _ua_is_phone(request.headers.get("User-Agent", "")): data["orientation"] = "portrait-primary" else: data["orientation"] = "any" response = app.make_response(json.dumps(data, ensure_ascii=False)) response.mimetype = "application/manifest+json" response.headers["Cache-Control"] = "no-cache" return response @app.route("/sw.js") def service_worker(): response = app.send_static_file("sw.js") response.headers["Cache-Control"] = "no-cache" response.headers["Service-Worker-Allowed"] = "/" return response @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": u = request.form.get("username", "").strip() p = request.form.get("password", "") admin_u = get_setting("admin_username") admin_hash = get_setting("admin_password_hash") if u == admin_u and check_password_hash(admin_hash, p): session["logged_in"] = True session["username"] = u return redirect(url_for("positions")) flash("账号或密码错误") return render_template("login.html") @app.route("/logout") def logout(): session.clear() return redirect(url_for("login")) # —————————————— API —————————————— @app.route("/api/symbols/search") @login_required def api_symbol_search(): q = request.args.get("q", "") conn = get_db() try: from trading_context import get_account_capital, is_ctp_connected capital = get_account_capital(conn, get_setting) ctp_connected = is_ctp_connected(get_setting) finally: conn.close() return jsonify(search_symbols(q, capital=capital, ctp_connected=ctp_connected)) @app.route("/api/symbols/mains") @login_required def api_symbols_mains(): return jsonify(list_main_contracts_grouped()) @app.route("/api/symbols/recommended") @login_required def api_symbols_recommended(): """品种下拉:仅展示当前资金下可开仓品种(与下方可开仓品种表一致)。""" from recommend_store import recommend_payload from trading_context import ( get_fixed_lots, get_max_margin_pct, get_recommend_capital, get_sizing_mode, get_trading_mode, ) conn = get_db() try: capital = get_recommend_capital(conn, get_setting) payload = recommend_payload( conn, live_capital=capital, max_margin_pct=get_max_margin_pct(get_setting), trading_mode=get_trading_mode(get_setting), sizing_mode=get_sizing_mode(get_setting), fixed_lots=get_fixed_lots(get_setting), ) return jsonify(list_recommended_symbols_grouped(payload.get("rows") or [])) finally: conn.close() @app.route("/api/key_prices") @login_required def api_key_prices(): """关键位监控列表:批量现价与距上/下沿距离。""" conn = get_db() rows = conn.execute( "SELECT id, symbol, market_code, sina_code, upper, lower " "FROM key_monitors WHERE status='active' OR status IS NULL" ).fetchall() conn.close() out = [] for r in rows: sym = r["symbol"] market = r["market_code"] or "" sina = r["sina_code"] or "" upper = float(r["upper"]) lower = float(r["lower"]) price = fetch_price(sym, market, sina) dist_upper = None dist_lower = None if price is not None: dist_upper = round(upper - price, 2) dist_lower = round(price - lower, 2) out.append({ "id": r["id"], "price": price, "dist_upper": dist_upper, "dist_lower": dist_lower, }) return jsonify(out) @app.route("/api/plan_prices") @login_required def api_plan_prices(): """今日计划:批量现价与距决策区间上/下沿距离。""" today = today_str() conn = get_db() rows = conn.execute( "SELECT id, symbol, market_code, sina_code, zone_upper, zone_lower " "FROM order_plans WHERE plan_date=? AND status IN ('planned', 'active')", (today,), ).fetchall() conn.close() out = [] for r in rows: sym = r["symbol"] market = r["market_code"] or "" sina = r["sina_code"] or "" upper = float(r["zone_upper"]) lower = float(r["zone_lower"]) price = fetch_price(sym, market, sina) dist_upper = None dist_lower = None in_zone = False if price is not None: dist_upper = round(upper - price, 2) dist_lower = round(price - lower, 2) in_zone = lower <= price <= upper out.append({ "id": r["id"], "price": price, "dist_upper": dist_upper, "dist_lower": dist_lower, "in_zone": in_zone, }) return jsonify(out) @app.route("/api/position_live") @login_required def api_position_live(): capital = float(get_setting("live_capital", "0") or 0) now_iso = datetime.now(TZ).strftime("%Y-%m-%dT%H:%M") conn = get_db() rows = conn.execute( "SELECT * FROM position_monitors WHERE status='active' ORDER BY id DESC" ).fetchall() conn.close() out = [] for r in rows: sym = r["symbol"] market = r["market_code"] or "" sina = r["sina_code"] or "" direction = r["direction"] entry = float(r["entry_price"]) sl = float(r["stop_loss"]) tp = float(r["take_profit"]) lots = float(r["lots"] or 1) mark = fetch_price(sym, market, sina) metrics = calc_position_metrics( direction, entry, sl, tp, lots, mark, capital, sym, ) holding = calc_holding_duration(r["open_time"] or "", now_iso) close_est = mark if mark is not None else entry fee_info = calc_fee_breakdown( sym, entry, close_est, lots, r["open_time"] or "", now_iso, trading_mode=_trading_mode(), ) est_net = None if metrics.get("float_pnl") is not None: est_net = round(metrics["float_pnl"] - fee_info["total_fee"], 2) out.append({ "id": r["id"], "symbol": r["symbol_name"] or sym, "symbol_code": sym, "direction": "做多" if direction == "long" else "做空", "lots": lots, "entry_price": entry, "stop_loss": sl, "take_profit": tp, "open_time": r["open_time"], "mark_price": mark, "holding_duration": holding, "est_fee": fee_info["total_fee"], "est_fee_open": fee_info["open_fee"], "est_fee_close": fee_info["close_fee"], "est_fee_close_type": fee_info["close_type"], "est_pnl_net": est_net, **metrics, }) return jsonify(out) @app.route("/plans") @login_required @require_nav("plans") def plans(): today = today_str() start = request.args.get("start", "") end = request.args.get("end", "") conn = get_db() plan_list = conn.execute( "SELECT * FROM order_plans WHERE plan_date=? AND status IN ('planned', 'active') ORDER BY id DESC", (today,), ).fetchall() sql = "SELECT * FROM order_plans WHERE plan_date < ? OR status IN ('closed', 'expired')" params: list = [today] if start: sql += " AND plan_date >= ?" params.append(start) if end: sql += " AND plan_date <= ?" params.append(end) sql += " ORDER BY plan_date DESC, id DESC LIMIT 200" history = conn.execute(sql, params).fetchall() conn.close() return render_template( "plans.html", plans=plan_list, history=history, today=today, start=start, end=end, ) @app.route("/add_plan", methods=["POST"]) @login_required def add_plan(): d = request.form direction = d.get("direction") symbol = d.get("symbol", "").strip() symbol_name = d.get("symbol_name", "").strip() market_code = d.get("market_code", "").strip() sina_code = d.get("sina_code", "").strip() if not direction: flash("请选择多空方向") return redirect(url_for("plans")) if not symbol or not market_code: flash("请从下拉列表选择品种(同花顺合约代码)") return redirect(url_for("plans")) conn = get_db() conn.execute( """INSERT INTO order_plans (symbol, symbol_name, market_code, sina_code, direction, zone_upper, zone_lower, stop_loss, take_profit, plan_date, decision_reason) VALUES (?,?,?,?,?,?,?,?,?,?,?)""", ( symbol, symbol_name, market_code, sina_code, direction, float(d["zone_upper"]), float(d["zone_lower"]), float(d["stop_loss"]), float(d["take_profit"]), today_str(), d.get("decision_reason", "").strip(), ), ) conn.commit() conn.close() flash("开单计划已添加") return redirect(url_for("plans")) @app.route("/del_plan/") @login_required def del_plan(pid): conn = get_db() conn.execute("DELETE FROM order_plans WHERE id=?", (pid,)) conn.commit() conn.close() flash("已删除") return redirect(url_for("plans")) @app.route("/ai") @login_required @require_nav("ai") def ai_messages_page(): from ai_messages import list_ai_messages conn = get_db() try: messages = list_ai_messages(conn, limit=100) finally: conn.close() return render_template("ai_messages.html", messages=messages) @app.route("/keys") @login_required def keys(): from key_monitor_lib import key_monitor_periods conn = get_db() key_list = conn.execute( "SELECT * FROM key_monitors WHERE status='active' OR status IS NULL ORDER BY id DESC" ).fetchall() history = conn.execute( "SELECT * FROM key_monitors WHERE status='archived' ORDER BY archived_at DESC LIMIT 100" ).fetchall() conn.close() return render_template( "keys.html", keys=key_list, history=history, key_periods=key_monitor_periods(), ) @app.route("/add_key", methods=["POST"]) @login_required def add_key(): d = request.form symbol = d.get("symbol", "").strip() symbol_name = d.get("symbol_name", "").strip() market_code = d.get("market_code", "").strip() sina_code = d.get("sina_code", "").strip() monitor_type = (d.get("type") or "").strip() if not symbol or not market_code: flash("请从下拉列表选择品种(同花顺合约代码)") return redirect(url_for("keys")) try: upper = float(d.get("upper") or 0) lower = float(d.get("lower") or 0) except (TypeError, ValueError): flash("上沿/下沿价格无效") return redirect(url_for("keys")) if upper <= lower: flash("上沿必须大于下沿") return redirect(url_for("keys")) trade_mode = (d.get("trade_mode") or "顺势").strip() if trade_mode not in ("顺势", "反转"): trade_mode = "顺势" try: risk_reward = float(d.get("risk_reward") or 2) except (TypeError, ValueError): risk_reward = 2.0 risk_reward = max(0.5, min(10.0, risk_reward)) trailing_be = 1 if d.get("trailing_be") else 0 if trailing_be and risk_reward < 3: risk_reward = 3.0 from key_monitor_lib import normalize_bar_period bar_period = normalize_bar_period(d.get("bar_period") or "5m") direction = (d.get("direction") or "").strip().lower() if monitor_type == "箱体突破": if direction not in ("long", "short"): flash("箱体突破须选择上方向(做多/做空)") return redirect(url_for("keys")) else: direction = "" conn = get_db() conn.execute( """INSERT INTO key_monitors (symbol, symbol_name, market_code, sina_code, monitor_type, direction, upper, lower, trade_mode, risk_reward, trailing_be, bar_period) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""", ( symbol, symbol_name, market_code, sina_code, monitor_type, direction, upper, lower, trade_mode, risk_reward, trailing_be, bar_period, ), ) conn.commit() conn.close() flash("关键位监控已添加") return redirect(url_for("keys")) @app.route("/add_position", methods=["POST"]) @login_required def add_position(): flash("持仓由策略交易或 CTP 自动同步,无需手工录入") return redirect(url_for("positions")) @app.route("/del_position/") @login_required def del_position(pid): return close_position(pid) @app.route("/close_position/", methods=["POST"]) @login_required def close_position(pid): conn = get_db() row = conn.execute("SELECT * FROM position_monitors WHERE id=?", (pid,)).fetchone() if not row: conn.close() flash("持仓不存在") return redirect(url_for("positions")) sym = row["symbol"] market = row["market_code"] or "" sina = row["sina_code"] or "" direction = row["direction"] entry = float(row["entry_price"]) sl = float(row["stop_loss"]) tp = float(row["take_profit"]) lots = float(row["lots"] or 1) open_time = row["open_time"] or "" close_time = datetime.now(TZ).strftime("%Y-%m-%dT%H:%M") close_price = fetch_price(sym, market, sina) if close_price is None: conn.close() flash("无法获取现价,平仓失败") return redirect(url_for("positions")) capital = float(get_setting("live_capital", "0") or 0) metrics = calc_position_metrics(direction, entry, sl, tp, lots, close_price, capital, sym) pnl = metrics.get("float_pnl") or 0.0 fee = calc_round_trip_fee(sym, entry, close_price, lots, open_time, close_time, trading_mode=_trading_mode()) pnl_net = round(pnl - fee, 2) result = classify_close_result(direction, close_price, sl, tp) minutes = holding_to_minutes(open_time, close_time) margin_pct = metrics.get("position_pct") from trade_log_lib import calc_equity_after equity_after = calc_equity_after(capital, pnl_net) conn.execute( """INSERT INTO trade_logs (symbol, symbol_name, market_code, sina_code, monitor_type, direction, entry_price, stop_loss, take_profit, close_price, lots, margin, margin_pct, holding_minutes, open_time, close_time, pnl, fee, pnl_net, equity_after, result) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", ( sym, row["symbol_name"], market, sina, "持仓监控", direction, entry, sl, tp, close_price, lots, metrics["margin"], margin_pct, minutes, open_time, close_time, pnl, fee, pnl_net, equity_after, result, ), ) conn.execute("DELETE FROM position_monitors WHERE id=?", (pid,)) conn.commit() conn.close() touch_stats_cache() flash(f"已平仓,盈亏 {pnl:.2f} 元(扣费后 {pnl_net:.2f} 元),已记入交易记录") return redirect(url_for("positions")) @app.route("/trades") @login_required def trades(): return redirect(url_for("records")) @app.route("/update_trade/", methods=["POST"]) @login_required def update_trade(tid): d = request.form conn = get_db() conn.execute( """UPDATE trade_logs SET symbol_name=?, monitor_type=?, direction=?, entry_price=?, stop_loss=?, take_profit=?, close_price=?, lots=?, margin=?, holding_minutes=?, open_time=?, close_time=?, pnl=?, result=?, verified=1 WHERE id=?""", ( d.get("symbol_name", "").strip(), d.get("monitor_type", "").strip(), d.get("direction", "").strip(), float(d.get("entry_price") or 0), float(d.get("stop_loss") or 0), float(d.get("take_profit") or 0), float(d.get("close_price") or 0), float(d.get("lots") or 0), float(d.get("margin") or 0), int(d.get("holding_minutes") or 0), d.get("open_time", "").strip(), d.get("close_time", "").strip(), float(d.get("pnl") or 0), d.get("result", "").strip(), tid, ), ) conn.commit() conn.close() touch_stats_cache() flash("交易记录已核对保存") return redirect(url_for("records")) @app.route("/del_trade/") @login_required def del_trade(tid): conn = get_db() conn.execute("DELETE FROM trade_logs WHERE id=?", (tid,)) conn.commit() conn.close() touch_stats_cache() flash("已删除") return redirect(url_for("records")) @app.route("/fill_review/") @login_required def fill_review_from_trade(tid): conn = get_db() row = conn.execute("SELECT * FROM trade_logs WHERE id=?", (tid,)).fetchone() conn.close() if not row: flash("记录不存在") return redirect(url_for("records")) q = { "symbol": row["symbol"], "symbol_name": row["symbol_name"] or row["symbol"], "market_code": row["market_code"] or "", "sina_code": row["sina_code"] or "", "direction": row["direction"], "entry_price": row["entry_price"], "stop_loss": row["stop_loss"], "take_profit": row["take_profit"], "close_price": row["close_price"], "lots": row["lots"], "open_time": row["open_time"], "close_time": row["close_time"], "pnl": row["pnl"], } params = {k: v for k, v in q.items() if v is not None} return redirect(url_for("records", **params) + "#review-panel") @app.route("/del_key/") @login_required def del_key(pid): conn = get_db() conn.execute( "UPDATE key_monitors SET status='archived', archived_at=? WHERE id=?", (datetime.now(TZ).isoformat(), pid), ) conn.commit() conn.close() flash("已移入监控历史") return redirect(url_for("keys")) @app.route("/records") @login_required def records(): preset = request.args.get("preset", "") start = request.args.get("start", "") end = request.args.get("end", "") if preset: start, end = parse_review_date_filter(preset, start, end) conn = get_db() ctp_sync_info = None try: from ctp_trade_sync import sync_trade_logs_from_ctp from trading_context import get_account_capital, get_trading_mode from vnpy_bridge import ctp_status mode = get_trading_mode(get_setting) if ctp_status(mode).get("connected"): capital = get_account_capital(conn, get_setting) ctp_sync_info = sync_trade_logs_from_ctp( conn, mode, capital=capital, trading_mode=mode, ) conn.commit() except Exception as exc: app.logger.warning("ctp trade sync on records page: %s", exc) sql = "SELECT * FROM review_records WHERE 1=1" params: list = [] if start: sql += " AND date(close_time) >= ?" params.append(start) if end: sql += " AND date(close_time) <= ?" params.append(end) sql += " ORDER BY id DESC LIMIT 200" review_list = conn.execute(sql, params).fetchall() auto_list = conn.execute( "SELECT * FROM trade_records ORDER BY id DESC LIMIT 30" ).fetchall() trade_list = conn.execute( "SELECT * FROM trade_logs ORDER BY id DESC LIMIT 500" ).fetchall() from trade_log_lib import enrich_trades_for_records try: initial_capital = float(get_setting("live_capital", "0") or 0) except (TypeError, ValueError): initial_capital = 0.0 trades, equity_curve = enrich_trades_for_records( [dict(r) for r in trade_list], initial_capital=initial_capital, ) conn.close() trade_prefill_keys = ( "symbol", "symbol_name", "market_code", "sina_code", "direction", "entry_price", "stop_loss", "take_profit", "close_price", "lots", "open_time", "close_time", "pnl", ) prefill = {k: request.args.get(k) for k in trade_prefill_keys if request.args.get(k)} return render_template( "records.html", reviews=review_list, trades=trades, equity_curve=equity_curve, auto_records=auto_list, ctp_sync_info=ctp_sync_info, preset=preset, start=start, end=end, prefill=prefill, open_types=OPEN_TYPES, exit_triggers=EXIT_TRIGGERS, behavior_tags=BEHAVIOR_TAGS, kline_periods=KLINE_PERIODS, kline_cutoffs=KLINE_CUTOFFS, ) @app.route("/add_review", methods=["POST"]) @login_required def add_review(): d = request.form open_type = d.get("open_type", "").strip() exit_trigger = d.get("exit_trigger", "").strip() if not open_type: flash("请选择开仓类型") return redirect(url_for("records")) if not exit_trigger: flash("请选择离场触发") return redirect(url_for("records")) symbol = d.get("symbol", "").strip() symbol_name = d.get("symbol_name", "").strip() market_code = d.get("market_code", "").strip() sina_code = d.get("sina_code", "").strip() if not symbol or not market_code: flash("请从下拉列表选择品种(同花顺合约代码)") return redirect(url_for("records")) screenshot = "" f = request.files.get("screenshot") if f and f.filename: fname = secure_filename(f.filename) ts = datetime.now(TZ).strftime("%Y%m%d%H%M%S") screenshot = f"{ts}_{fname}" f.save(os.path.join(UPLOAD_DIR, screenshot)) tags = [t for t in BEHAVIOR_TAGS if d.get(f"tag_{t}")] is_emotion = 1 if tags else 0 def num(key: str) -> Optional[float]: v = d.get(key, "").strip() if not v: return None return float(v) open_time = d.get("open_time", "").strip() close_time = d.get("close_time", "").strip() direction = d.get("direction", "").strip() entry_price = num("entry_price") stop_loss = num("stop_loss") take_profit = num("take_profit") close_price = num("close_price") lots = num("lots") or 1.0 holding = calc_holding_duration(open_time, close_time) initial_pnl = calc_rr_ratio(direction, entry_price, stop_loss, take_profit) actual_pnl = calc_rr_ratio(direction, entry_price, stop_loss, close_price) gross_pnl = num("pnl") if gross_pnl is None and entry_price and close_price: spec_mult = calc_position_metrics( direction, entry_price, stop_loss, take_profit, lots, close_price, 0, symbol, ) gross_pnl = spec_mult.get("float_pnl") fee = calc_round_trip_fee( symbol, entry_price or 0, close_price or 0, lots, open_time, close_time, trading_mode=_trading_mode(), ) pnl_net = round((gross_pnl or 0) - fee, 2) if gross_pnl is not None else None auto_kline = bool(d.get("auto_kline")) if auto_kline and not screenshot: try: generated = generate_review_kline_chart( symbol=symbol, periods=[d.get("kline_period1", "15m"), d.get("kline_period2", "1h")], count=int(d.get("kline_count") or 300), cutoff_label=d.get("kline_cutoff", "平仓时间"), open_time=open_time, close_time=close_time, entry_price=entry_price, stop_loss=stop_loss, take_profit=take_profit, close_price=close_price, upload_dir=UPLOAD_DIR, ) if generated: screenshot = generated except Exception as exc: app.logger.warning("auto kline failed: %s", exc) conn = get_db() conn.execute( """INSERT INTO review_records (open_time, close_time, symbol, symbol_name, market_code, sina_code, timeframe, direction, entry_price, stop_loss, take_profit, close_price, lots, holding_duration, initial_pnl, actual_pnl, pnl, fee, pnl_net, open_type, expected_rr, actual_rr, exit_trigger, exit_supplement, watch_after_breakeven, new_position_while_occupied, screenshot, auto_kline, kline_period1, kline_period2, kline_count, kline_cutoff, behavior_tags, is_emotion, notes) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", ( open_time, close_time, symbol, symbol_name, market_code, sina_code, d.get("timeframe", "").strip(), direction, entry_price, stop_loss, take_profit, close_price, lots, holding, initial_pnl, actual_pnl, gross_pnl, fee, pnl_net, open_type, None, None, exit_trigger, d.get("exit_supplement", "").strip(), d.get("watch_after_breakeven", "否"), d.get("new_position_while_occupied", "否"), screenshot, 1 if auto_kline else 0, d.get("kline_period1", "15m"), d.get("kline_period2", "1h"), int(d.get("kline_count") or 300), d.get("kline_cutoff", "平仓时间"), ",".join(tags), is_emotion, d.get("notes", "").strip(), ), ) hook = getattr(app, "_risk_review_hook", None) if hook: hook( conn, ",".join(tags), exit_trigger, d.get("exit_supplement", "").strip(), ) conn.commit() conn.close() touch_stats_cache() flash("复盘记录已保存") return redirect(url_for("records")) @app.route("/del_review/") @login_required def del_review(rid): conn = get_db() row = conn.execute("SELECT screenshot FROM review_records WHERE id=?", (rid,)).fetchone() if row and row["screenshot"]: path = os.path.join(UPLOAD_DIR, row["screenshot"]) if os.path.isfile(path): os.remove(path) conn.execute("DELETE FROM review_records WHERE id=?", (rid,)) conn.commit() conn.close() touch_stats_cache() flash("已删除") return redirect(url_for("records")) @app.route("/uploads/") @login_required def uploaded_file(filename): from flask import send_from_directory return send_from_directory(UPLOAD_DIR, filename) @app.route("/del_record/") @login_required def del_record(rid): conn = get_db() conn.execute("DELETE FROM trade_records WHERE id=?", (rid,)) conn.commit() conn.close() flash("已删除") return redirect(url_for("records")) @app.route("/stats") @login_required def stats(): return render_template("stats.html") @app.route("/calendar") @login_required def trade_calendar(): return render_template("calendar.html") @app.route("/api/stats") @login_required def api_stats(): return jsonify(get_stats_data()) @app.route("/api/stats/views") @login_required def api_stats_views(): return jsonify({"views": STATS_VIEWS}) @app.route("/api/stats/refresh", methods=["POST"]) @login_required def api_stats_refresh(): conn = get_db() capital = float(get_setting("live_capital", "0") or 0) data = refresh_stats_cache(conn, capital) conn.close() return jsonify(data) @app.route("/api/stats/calendar") @login_required def api_stats_calendar(): now = datetime.now(TZ) year = request.args.get("year", type=int) or now.year month = request.args.get("month", type=int) or now.month if month < 1 or month > 12: return jsonify({"error": "invalid month"}), 400 conn = get_db() try: data = get_calendar_month(conn, year, month) finally: conn.close() return jsonify(data) @app.route("/api/stats/calendar/day") @login_required def api_stats_calendar_day(): day = (request.args.get("date") or "").strip() if not day: return jsonify({"error": "date required"}), 400 try: date.fromisoformat(day) except ValueError: return jsonify({"error": "invalid date"}), 400 conn = get_db() try: data = get_calendar_day(conn, day) finally: conn.close() return jsonify(data) _dashboard_sync_tick = {"n": 0} @app.route("/dashboard") @login_required @require_nav("dashboard") def dashboard(): return render_template("dashboard.html") @app.route("/risk-guide") @login_required @require_nav("risk_guide") def risk_guide(): from doc_render import read_doc, render_markdown try: _title, raw = read_doc("risk-guide") except FileNotFoundError: flash("文档不存在") return redirect(url_for("positions")) return render_template("risk_guide.html", doc_html=render_markdown(raw)) @app.route("/api/dashboard/live") @login_required def api_dashboard_live(): if not nav_enabled(get_setting, "dashboard"): return jsonify({"ok": False, "error": "数据看板已在系统设置中关闭"}), 403 from dashboard_lib import build_dashboard_payload _dashboard_sync_tick["n"] += 1 sync_trades = _dashboard_sync_tick["n"] % 15 == 0 try: payload = build_dashboard_payload( get_db=get_db, get_setting=get_setting, fetch_price=fetch_price, sync_ctp_trades=sync_trades, ) return jsonify(payload) except Exception as exc: app.logger.exception("dashboard live: %s", exc) return jsonify({"ok": False, "error": "看板数据暂时不可用"}), 503 @app.route("/market") @login_required @require_nav("market") def market_page(): symbol = request.args.get("symbol", "").strip() period = request.args.get("period", "15m").strip() valid = {p["key"] for p in MARKET_PERIODS} if period not in valid: period = "15m" ctp_st = {} try: from vnpy_bridge import ctp_status from trading_context import get_trading_mode ctp_st = ctp_status(get_trading_mode(get_setting)) except Exception: pass return render_template( "market.html", symbol=symbol, period=period, market_periods=MARKET_PERIODS, quote_label=get_quote_source_label(ctp_connected=bool(ctp_st.get("connected"))), ctp_connected=bool(ctp_st.get("connected")), ) @app.route("/api/kline") @login_required def api_kline(): symbol = request.args.get("symbol", "").strip() period = request.args.get("period", "15m").strip() if not symbol: return jsonify({"error": "请提供合约代码"}), 400 try: from trading_context import get_trading_mode data = fetch_market_klines( symbol, period, DB_PATH, prefer_ctp=False, ) except Exception as exc: app.logger.warning("kline api failed: %s", exc) return jsonify({"error": str(exc)}), 500 if not data.get("chart_symbol"): return jsonify({"error": "无法识别合约代码"}), 400 if not data.get("bars"): return jsonify({"error": "未获取到K线数据,请稍后重试或更换合约"}), 404 return jsonify(data) @app.route("/api/kline/stream") @login_required def api_kline_stream(): from queue import Empty symbol = request.args.get("symbol", "").strip() period = request.args.get("period", "15m").strip() market_code = request.args.get("market_code", "").strip() sina_code = request.args.get("sina_code", "").strip() if not symbol: return jsonify({"error": "请提供合约代码"}), 400 def generate(): sub = kline_hub.subscribe(symbol, period, market_code, sina_code) try: kline_data = fetch_market_klines( symbol, period, DB_PATH, prefer_ctp=False, ) if kline_data.get("bars"): yield sse_format("kline", kline_data) yield sse_format( "quote", build_market_quote_payload( symbol, market_code, sina_code, prefer_sina=True, ), ) while True: try: msg = sub.queue.get(timeout=20) yield sse_format(msg["event"], msg["data"]) except Empty: yield ": heartbeat\n\n" finally: kline_hub.unsubscribe(sub) return Response( stream_with_context(generate()), mimetype="text/event-stream", headers={ "Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no", }, ) @app.route("/api/market_quote") @login_required def api_market_quote(): symbol = request.args.get("symbol", "").strip() market_code = request.args.get("market_code", "").strip() sina_code = request.args.get("sina_code", "").strip() if not symbol and not market_code: return jsonify({"error": "请提供合约"}), 400 return jsonify(build_market_quote_payload( symbol, market_code, sina_code, prefer_sina=True, )) @app.route("/contract") @login_required def contract_profile_page(): return redirect(url_for("positions")) @app.route("/api/contract_profile") @login_required def api_contract_profile(): return jsonify({"error": "品种简介功能已移除"}), 404 @app.route("/fees", methods=["GET", "POST"]) @login_required @require_nav("fees") def fees(): from trading_context import get_trading_mode from ctp_fee_worker import ( schedule_ctp_fee_sync, get_fee_last_sync, fees_synced_today, fee_sync_in_progress, ) from vnpy_bridge import ctp_status mode = get_trading_mode(get_setting) if request.method == "POST": action = request.form.get("action") if action == "sync_ctp": force = request.form.get("force") == "1" _, msg = schedule_ctp_fee_sync( mode, get_setting=get_setting, set_setting=set_setting, force=force, ) flash(msg) return redirect(url_for("fees")) rates = list_fee_rates_for_ui() fee_counts = count_fee_rates_by_source() ctp_st = ctp_status(mode) return render_template( "fees.html", rates=rates, fee_counts=fee_counts, fee_last_sync=get_fee_last_sync(get_setting), fee_synced_today=fees_synced_today(get_setting), fee_sync_running=fee_sync_in_progress(), ctp_connected=bool(ctp_st.get("connected")), ) @app.route("/api/backup/list") @login_required def api_backup_list(): return jsonify( { "dir": str(backup_dir()), "last_at": get_backup_last_at(get_setting), "running": backup_in_progress(), "items": list_backups(), } ) @app.route("/api/backup/download/") @login_required def api_backup_download(filename): from flask import send_file try: path = resolve_backup_file(filename) except (ValueError, FileNotFoundError) as exc: return jsonify({"error": str(exc)}), 404 return send_file(path, as_attachment=True, download_name=path.name) @app.route("/settings", methods=["GET", "POST"]) @login_required def settings(): if request.method == "POST": action = request.form.get("action") if action == "backup_now": ok, msg = schedule_backup( get_setting=get_setting, set_setting=set_setting, include_uploads=True, ) flash(msg if ok else msg) elif action == "backup_config": auto = request.form.get("backup_auto_enabled") == "1" set_setting("backup_auto_enabled", "1" if auto else "0") try: hour = int(request.form.get("backup_auto_hour", "3") or 3) set_setting("backup_auto_hour", str(max(0, min(23, hour)))) except ValueError: flash("自动备份小时无效") return redirect(url_for("settings")) try: keep = int(request.form.get("backup_keep_count", "30") or 30) set_setting("backup_keep_count", str(max(5, min(200, keep)))) except ValueError: flash("保留份数无效") return redirect(url_for("settings")) flash("备份策略已保存") elif action == "wechat": webhook = request.form.get("wechat_webhook", "").strip() set_setting("wechat_webhook", webhook) flash("企业微信配置已保存") elif action == "ai": set_setting("ai_enabled", "1" if request.form.get("ai_enabled") else "0") provider = (request.form.get("ai_provider") or "ollama").strip().lower() if provider not in ("ollama", "openai"): provider = "ollama" set_setting("ai_provider", provider) set_setting("ai_ollama_base_url", (request.form.get("ai_ollama_base_url") or "").strip()) set_setting("ai_ollama_model", (request.form.get("ai_ollama_model") or "").strip()) set_setting("ai_openai_base_url", (request.form.get("ai_openai_base_url") or "").strip()) key = (request.form.get("ai_openai_api_key") or "").strip() if key: set_setting("ai_openai_api_key", key) set_setting("ai_openai_model", (request.form.get("ai_openai_model") or "").strip()) set_setting("ai_daily_report_enabled", "1" if request.form.get("ai_daily_report_enabled") else "0") try: set_setting("ai_daily_report_hour", str(max(0, min(23, int(request.form.get("ai_daily_report_hour", "15") or 15))))) except ValueError: pass try: set_setting("ai_daily_report_minute", str(max(0, min(59, int(request.form.get("ai_daily_report_minute", "5") or 5))))) except ValueError: pass flash("AI 配置已保存") elif action == "trading": mode = request.form.get("trading_mode", "simulation").strip() if mode not in ("simulation", "live"): mode = "simulation" sizing = request.form.get("position_sizing_mode", "fixed").strip() if sizing == "risk": sizing = "amount" if sizing not in ("fixed", "amount"): sizing = "fixed" set_setting("trading_mode", mode) set_setting("position_sizing_mode", sizing) try: fl = int(float(request.form.get("fixed_lots", "1") or 1)) set_setting("fixed_lots", str(max(1, fl))) except ValueError: flash("固定手数无效") return redirect(url_for("settings")) try: fa = float(request.form.get("fixed_amount", "5000") or 5000) set_setting("fixed_amount", str(max(1.0, fa))) except ValueError: flash("固定金额无效") return redirect(url_for("settings")) try: rp = float(request.form.get("risk_percent", "1") or 1) set_setting("risk_percent", str(max(0.1, min(100.0, rp)))) except ValueError: pass try: mp = float(request.form.get("max_margin_pct", "30") or 30) set_setting("max_margin_pct", str(max(1.0, min(100.0, mp)))) except ValueError: flash("保证金比例无效") return redirect(url_for("settings")) try: rmp = float(request.form.get("roll_max_margin_pct", "50") or 50) set_setting("roll_max_margin_pct", str(max(1.0, min(100.0, rmp)))) except ValueError: flash("滚仓保证金比例无效") return redirect(url_for("settings")) try: tb = int(float(request.form.get("trailing_be_tick_buffer", "2") or 2)) set_setting("trailing_be_tick_buffer", str(max(1, min(20, tb)))) except ValueError: flash("移动保本缓冲无效") return redirect(url_for("settings")) try: pt = int(float(request.form.get("pending_order_timeout_min", "5") or 5)) set_setting("pending_order_timeout_min", str(max(1, min(60, pt)))) except ValueError: flash("挂单超时无效") return redirect(url_for("settings")) flash("交易模式已保存") elif action == "ctp": from ctp_settings import save_ctp_auto_connect, is_ctp_auto_connect_enabled from ctp_settings import save_ctp_settings_from_form from vnpy_bridge import ctp_disconnect was_enabled = is_ctp_auto_connect_enabled(get_setting) auto_enabled = save_ctp_auto_connect(request.form, set_setting) save_result = save_ctp_settings_from_form(request.form, set_setting) if not auto_enabled: ctp_disconnect(set_disabled_hint=True) elif not was_enabled and auto_enabled: try: from vnpy_bridge import get_bridge from trading_context import get_trading_mode mode = get_trading_mode(get_setting) get_bridge().reconnect_after_settings_saved(mode) except Exception as exc: app.logger.debug("CTP connect after enable auto: %s", exc) pwd_updated = save_result.get("passwords_updated") or [] pwd_empty = save_result.get("passwords_submitted_empty") or [] simnow_pwd_len = len((request.form.get("simnow_password") or "").strip()) live_pwd_len = len((request.form.get("ctp_live_password") or "").strip()) print( f"CTP settings save: simnow_password_len={simnow_pwd_len} " f"live_password_len={live_pwd_len} updated={pwd_updated}", flush=True, ) app.logger.info( "CTP settings save: simnow_password_len=%s live_password_len=%s updated=%s", simnow_pwd_len, live_pwd_len, pwd_updated, ) if "simnow_password" in pwd_updated: pwd_note = f"SimNow 交易密码已更新({simnow_pwd_len} 位)" elif "simnow_password" in pwd_empty: pwd_note = "SimNow 交易密码未改:提交为空,请在「交易密码」框手打后再保存" elif "ctp_live_password" in pwd_updated: pwd_note = "实盘交易密码已更新" elif "ctp_live_password" in pwd_empty: pwd_note = "实盘交易密码未改(提交为空)" else: pwd_note = "" if not auto_enabled: flash("CTP 配置已保存;自动连接已关闭,所有 CTP 连接已断开") return redirect(url_for("settings")) if not was_enabled: flash("CTP 配置已保存;自动连接已开启,正在连接…") return redirect(url_for("settings")) flash_msg = "CTP 配置已保存,正在使用新地址重连…" if pwd_note: flash_msg = f"CTP 配置已保存;{pwd_note},正在重连…" try: from vnpy_bridge import get_bridge from trading_context import get_trading_mode b = get_bridge() if pwd_updated: b._clear_login_cooldown() mode = get_trading_mode(get_setting) info = b.reconnect_after_settings_saved(mode) if info.get("cooldown"): flash_msg = f"CTP 配置已保存;{pwd_note or '请稍后再连'}" elif not info.get("started") and info.get("connected"): flash_msg = f"CTP 配置已保存;{pwd_note or '当前连接正常'}" except Exception as exc: app.logger.warning("CTP reconnect after settings save: %s", exc) flash_msg = f"CTP 配置已保存;{pwd_note or '请稍后在持仓监控页重连'}" flash(flash_msg) elif action == "nav": items = {k: request.form.get(f"nav_{k}") == "on" for k in NAV_TOGGLES} save_nav_items(set_setting, items) flash("导航显示已保存") elif action == "password": ok, msg, _ = save_admin_credentials( username=request.form.get("admin_username", ""), old_password=request.form.get("old_password", ""), new_password=request.form.get("new_password", ""), new_password2=request.form.get("new_password2", ""), get_setting=get_setting, set_setting=set_setting, ) if ok and session.get("logged_in"): session["username"] = (request.form.get("admin_username") or "").strip() flash(msg) return redirect(url_for("settings")) webhook = get_setting("wechat_webhook") username = get_setting("admin_username") ctp_st = {} try: from vnpy_bridge import ctp_status from trading_context import get_trading_mode ctp_st = ctp_status(get_trading_mode(get_setting)) except Exception: pass from ctp_settings import get_ctp_settings_for_ui, is_ctp_auto_connect_enabled from product_recommend import small_account_margin_recommendations return render_template( "settings.html", webhook=webhook, username=username, quote_label=get_quote_source_label(ctp_connected=bool(ctp_st.get("connected"))), ctp_status=ctp_st, ctp_cfg=get_ctp_settings_for_ui(), ctp_auto_connect=is_ctp_auto_connect_enabled(get_setting), trading_mode=get_setting("trading_mode", "simulation"), position_sizing_mode=get_setting("position_sizing_mode", "fixed"), fixed_lots=get_setting("fixed_lots", "1"), fixed_amount=get_setting("fixed_amount", "5000"), risk_percent=get_setting("risk_percent", "1"), max_margin_pct=get_setting("max_margin_pct", "30"), roll_max_margin_pct=get_setting("roll_max_margin_pct", "50"), small_account_margin_rec=small_account_margin_recommendations(), trailing_be_tick_buffer=get_setting("trailing_be_tick_buffer", "2"), pending_order_timeout_min=get_setting("pending_order_timeout_min", "5"), nav_items=get_nav_items(get_setting), nav_toggles=NAV_TOGGLES, backup_dir=str(backup_dir()), backup_last_at=get_backup_last_at(get_setting), backup_running=backup_in_progress(), backup_items=list_backups(), backup_auto_enabled=get_setting("backup_auto_enabled", "1") == "1", backup_auto_hour=get_setting("backup_auto_hour", "3"), backup_keep_count=get_setting("backup_keep_count", "30"), backup_restore_dir=default_restore_dir(), ai_enabled=get_setting("ai_enabled", "0") == "1", ai_provider=get_setting("ai_provider", "ollama"), ai_ollama_base_url=get_setting("ai_ollama_base_url", "http://127.0.0.1:11434"), ai_ollama_model=get_setting("ai_ollama_model", "qwen2.5:7b"), ai_openai_base_url=get_setting("ai_openai_base_url", "https://api.openai.com/v1"), ai_openai_api_key=get_setting("ai_openai_api_key", ""), ai_openai_model=get_setting("ai_openai_model", "gpt-4o-mini"), ai_daily_report_enabled=get_setting("ai_daily_report_enabled", "1") == "1", ai_daily_report_hour=get_setting("ai_daily_report_hour", "15"), ai_daily_report_minute=get_setting("ai_daily_report_minute", "5"), ) install_trading( app, login_required=login_required, require_nav=require_nav, get_db=get_db, get_setting=get_setting, set_setting=set_setting, fetch_price=fetch_price, send_wechat_msg=send_wechat_msg, ) try_init_vnpy({}) start_background_threads() # —————————————— 启动 —————————————— if __name__ == "__main__": app.run(host=HOST, port=PORT, debug=DEBUG, threaded=True)