import os import sqlite3 import time import threading import requests from datetime import datetime from typing import Optional from functools import wraps from zoneinfo import ZoneInfo from werkzeug.utils import secure_filename from dotenv import load_dotenv from flask import ( Flask, render_template, request, redirect, url_for, flash, session, jsonify, ) from werkzeug.security import check_password_hash, generate_password_hash from symbols import search_symbols, ths_to_codes from market import get_price as market_get_price, set_ths_refresh_token, get_quote_source_label load_dotenv(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env")) app = Flask(__name__) app.secret_key = os.getenv("SECRET_KEY", "futures_monitor_default_secret") HOST = os.getenv("HOST", "0.0.0.0") PORT = int(os.getenv("PORT", "6600")) DEBUG = os.getenv("DEBUG", "false").lower() in ("1", "true", "yes") DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "futures.db") UPLOAD_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "uploads") TZ = ZoneInfo("Asia/Shanghai") OPEN_TYPES = ["突破开仓", "回调开仓", "追涨杀跌", "计划内开仓", "震荡摸顶底", "其他"] EXIT_TRIGGERS = ["止盈", "止损", "手工平仓", "移动止损", "时间离场", "其他"] BEHAVIOR_TAGS = ["怕踏空", "报复开仓", "盈利飘了", "拿不住单", "扛单", "重仓违规"] KLINE_PERIODS = ["1m", "3m", "5m", "15m", "30m", "1h", "4h", "1d"] KLINE_CUTOFFS = ["平仓时间", "开仓时间", "当前时间"] def today_str() -> str: return datetime.now(TZ).date().isoformat() def expire_old_plans(): """当日结束后计划自动失效,保留历史。""" today = today_str() conn = get_db() conn.execute( "UPDATE order_plans SET status='expired' WHERE plan_date < ? AND status IN ('planned', 'active')", (today,), ) conn.execute( "UPDATE order_plans SET plan_date=date(created_at) WHERE plan_date IS NULL OR plan_date=''" ) conn.commit() conn.close() # —————————————— 设置读写 —————————————— def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def get_setting(key: str, default: str = "") -> str: conn = get_db() row = conn.execute("SELECT value FROM settings WHERE key=?", (key,)).fetchone() conn.close() return row["value"] if row else default def set_setting(key: str, value: str): conn = get_db() conn.execute( "INSERT INTO settings (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=?", (key, value, value), ) conn.commit() conn.close() def init_db(): conn = get_db() c = conn.cursor() c.execute("CREATE TABLE IF NOT EXISTS settings (key TEXT PRIMARY KEY, value TEXT)") c.execute('''CREATE TABLE IF NOT EXISTS order_plans (id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT, symbol_name TEXT, direction TEXT, zone_upper REAL, zone_lower REAL, stop_loss REAL, take_profit REAL, status TEXT DEFAULT "planned", triggered_at TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''') c.execute('''CREATE TABLE IF NOT EXISTS key_monitors (id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT, symbol_name TEXT, monitor_type TEXT, direction TEXT, upper REAL, lower REAL, upper_triggered INTEGER DEFAULT 0, lower_triggered INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''') c.execute('''CREATE TABLE IF NOT EXISTS trade_records (id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT, symbol_name TEXT, monitor_type TEXT, direction TEXT, trigger_price REAL, stop_loss REAL, take_profit REAL, result TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''') migrations = [ "ALTER TABLE key_monitors ADD COLUMN symbol_name TEXT", "ALTER TABLE key_monitors ADD COLUMN upper_triggered INTEGER DEFAULT 0", "ALTER TABLE key_monitors ADD COLUMN lower_triggered INTEGER DEFAULT 0", "ALTER TABLE trade_records ADD COLUMN symbol_name TEXT", "ALTER TABLE order_plans ADD COLUMN sina_code TEXT", "ALTER TABLE order_plans ADD COLUMN market_code TEXT", "ALTER TABLE key_monitors ADD COLUMN market_code TEXT", "ALTER TABLE trade_records ADD COLUMN market_code TEXT", "ALTER TABLE order_plans ADD COLUMN plan_date TEXT", ] for sql in migrations: try: c.execute(sql) except sqlite3.OperationalError: pass c.execute('''CREATE TABLE IF NOT EXISTS review_records (id INTEGER PRIMARY KEY AUTOINCREMENT, open_time TEXT, close_time TEXT, symbol TEXT, timeframe TEXT, pnl REAL, open_type TEXT, expected_rr REAL, actual_rr REAL, exit_trigger TEXT, exit_supplement TEXT, watch_after_breakeven TEXT, new_position_while_occupied TEXT, screenshot TEXT, auto_kline INTEGER DEFAULT 0, kline_period1 TEXT, kline_period2 TEXT, kline_count INTEGER, kline_cutoff TEXT, behavior_tags TEXT, notes TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''') conn.commit() conn.close() sync_admin_from_env() if not get_setting("wechat_webhook") and os.getenv("WECHAT_WEBHOOK"): set_setting("wechat_webhook", os.getenv("WECHAT_WEBHOOK")) if not get_setting("ths_refresh_token") and os.getenv("THS_REFRESH_TOKEN"): set_setting("ths_refresh_token", os.getenv("THS_REFRESH_TOKEN")) os.makedirs(UPLOAD_DIR, exist_ok=True) expire_old_plans() def sync_admin_from_env(): """ 从 .env 同步管理员账号。 - 首次建库:自动写入 ADMIN_USERNAME / ADMIN_PASSWORD - 已建库后改 .env:需设 ADMIN_SYNC_FROM_ENV=true 并重启服务 """ sync = os.getenv("ADMIN_SYNC_FROM_ENV", "false").lower() in ("1", "true", "yes") env_username = os.getenv("ADMIN_USERNAME", "").strip() env_password = os.getenv("ADMIN_PASSWORD", "").strip() placeholder_passwords = {"", "change-me-on-first-login", "admin123"} if not get_setting("admin_username"): username = env_username or "admin" password = env_password if env_password not in placeholder_passwords else "admin123" set_setting("admin_username", username) set_setting("admin_password_hash", generate_password_hash(password)) return if not sync: return if env_username: set_setting("admin_username", env_username) if env_password and env_password not in placeholder_passwords: set_setting("admin_password_hash", generate_password_hash(env_password)) init_db() def sync_ths_token(): set_ths_refresh_token(get_setting("ths_refresh_token")) sync_ths_token() # —————————————— 推送 —————————————— def send_wechat_msg(content: str): webhook = get_setting("wechat_webhook") if not webhook: return full = f"【国内期货】\n{content}" data = {"msgtype": "text", "text": {"content": full}} try: requests.post(webhook, json=data, timeout=10) except Exception: pass # —————————————— 行情 —————————————— def resolve_market_codes(ths_code: str, market_code: str = "", sina_code: str = "") -> tuple[str, str]: """返回 (market_code, sina_code) 用于行情拉取。""" if market_code: return market_code, sina_code if sina_code and "." in sina_code: return sina_code, "" codes = ths_to_codes(ths_code) if codes: return codes["market_code"], codes["sina_code"] if ths_code.startswith("nf_") or ths_code.startswith("CFF_RE_"): return ths_code, ths_code return "", sina_code or "" def fetch_price(ths_code: str, market_code: str = "", sina_code: str = "") -> Optional[float]: mc, sc = resolve_market_codes(ths_code, market_code, sina_code) if not mc and not sc: return None return market_get_price(mc, sc) # —————————————— 监控逻辑 —————————————— def check_order_plans(): expire_old_plans() today = today_str() conn = get_db() rows = conn.execute( "SELECT * FROM order_plans WHERE plan_date=? AND status IN ('planned', 'active')", (today,), ).fetchall() for r in rows: sym = r["symbol"] sina = r["sina_code"] if "sina_code" in r.keys() else "" market = r["market_code"] if "market_code" in r.keys() else "" p = fetch_price(sym, market, sina) if not p: continue direction = r["direction"] zone_upper = r["zone_upper"] zone_lower = r["zone_lower"] stop_loss = r["stop_loss"] take_profit = r["take_profit"] status = r["status"] pid = r["id"] name = r["symbol_name"] or sym # 计划状态:价格进入决策区间则激活并通知 if status == "planned": in_zone = zone_lower <= p <= zone_upper if in_zone: msg = ( f"【开单计划触发】{name} ({sym})\n" f"方向:{'做多' if direction == 'long' else '做空'}\n" f"决策区间:{zone_lower} ~ {zone_upper}\n" f"当前价:{p}\n" f"止损:{stop_loss} 止盈:{take_profit}" ) send_wechat_msg(msg) conn.execute( "UPDATE order_plans SET status='active', triggered_at=? WHERE id=?", (datetime.now().isoformat(), pid), ) status = "active" # 激活状态:监控止盈止损 if status == "active": res = None if direction == "long": if p >= take_profit: res = "止盈" elif p <= stop_loss: res = "止损" elif direction == "short": if p <= take_profit: res = "止盈" elif p >= stop_loss: res = "止损" if res: msg = ( f"[{'做多' if direction == 'long' else '做空'}] {name} 已{res}\n" f"决策区间:{zone_lower} ~ {zone_upper}\n" f"止损:{stop_loss} 止盈:{take_profit}\n" f"当前价:{p}" ) send_wechat_msg(msg) conn.execute( """INSERT INTO trade_records (symbol, symbol_name, monitor_type, direction, trigger_price, stop_loss, take_profit, result) VALUES (?,?,?,?,?,?,?,?)""", (sym, name, "开单计划", direction, p, stop_loss, take_profit, res), ) conn.execute( "UPDATE order_plans SET status='closed' WHERE id=?", (pid,) ) conn.commit() conn.close() def check_key_monitors(): conn = get_db() rows = conn.execute("SELECT * FROM key_monitors").fetchall() for r in rows: sym = r["symbol"] typ = r["monitor_type"] up = r["upper"] low = r["lower"] up_trig = r["upper_triggered"] low_trig = r["lower_triggered"] name = r["symbol_name"] or sym pid = r["id"] sina = r["sina_code"] if "sina_code" in r.keys() else "" market = r["market_code"] if "market_code" in r.keys() else "" p = fetch_price(sym, market, sina) if not p: continue if typ in ("箱体突破", "收敛突破"): if p > up and not up_trig: send_wechat_msg(f"{name} 突破{typ}上沿 {up}\n当前价:{p}") conn.execute( "UPDATE key_monitors SET upper_triggered=1 WHERE id=?", (pid,) ) if p < low and not low_trig: send_wechat_msg(f"{name} 跌破{typ}下沿 {low}\n当前价:{p}") conn.execute( "UPDATE key_monitors SET lower_triggered=1 WHERE id=?", (pid,) ) elif typ == "关键阻力位" and p > up and not up_trig: send_wechat_msg(f"{name} 突破阻力位 {up}\n当前价:{p}") conn.execute( "UPDATE key_monitors SET upper_triggered=1 WHERE id=?", (pid,) ) elif typ == "关键支撑位" and p < low and not low_trig: send_wechat_msg(f"{name} 跌破支撑位 {low}\n当前价:{p}") conn.execute( "UPDATE key_monitors SET lower_triggered=1 WHERE id=?", (pid,) ) conn.commit() conn.close() def background_task(): while True: try: expire_old_plans() check_key_monitors() check_order_plans() except Exception: pass time.sleep(3) # —————————————— 登录 —————————————— def login_required(f): @wraps(f) def wrap(*args, **kwargs): if not session.get("logged_in"): return redirect(url_for("login")) return f(*args, **kwargs) return wrap @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": u = request.form.get("username", "").strip() p = request.form.get("password", "") admin_u = get_setting("admin_username") admin_hash = get_setting("admin_password_hash") if u == admin_u and check_password_hash(admin_hash, p): session["logged_in"] = True session["username"] = u return redirect(url_for("plans")) flash("账号或密码错误") return render_template("login.html") @app.route("/logout") def logout(): session.clear() return redirect(url_for("login")) # —————————————— API —————————————— @app.route("/api/symbols/search") @login_required def api_symbol_search(): q = request.args.get("q", "") return jsonify(search_symbols(q)) # —————————————— 页面路由 —————————————— @app.route("/") @login_required def index(): return redirect(url_for("plans")) @app.route("/plans") @login_required def plans(): today = today_str() start = request.args.get("start", "") end = request.args.get("end", "") conn = get_db() plan_list = conn.execute( "SELECT * FROM order_plans WHERE plan_date=? AND status IN ('planned', 'active') ORDER BY id DESC", (today,), ).fetchall() sql = "SELECT * FROM order_plans WHERE plan_date < ? OR status IN ('closed', 'expired')" params: list = [today] if start: sql += " AND plan_date >= ?" params.append(start) if end: sql += " AND plan_date <= ?" params.append(end) sql += " ORDER BY plan_date DESC, id DESC LIMIT 200" history = conn.execute(sql, params).fetchall() conn.close() return render_template( "plans.html", plans=plan_list, history=history, today=today, start=start, end=end, ) @app.route("/add_plan", methods=["POST"]) @login_required def add_plan(): d = request.form direction = d.get("direction") symbol = d.get("symbol", "").strip() symbol_name = d.get("symbol_name", "").strip() market_code = d.get("market_code", "").strip() sina_code = d.get("sina_code", "").strip() if not direction: flash("请选择多空方向") return redirect(url_for("plans")) if not symbol or not market_code: flash("请从下拉列表选择品种(同花顺合约代码)") return redirect(url_for("plans")) conn = get_db() conn.execute( """INSERT INTO order_plans (symbol, symbol_name, market_code, sina_code, direction, zone_upper, zone_lower, stop_loss, take_profit, plan_date) VALUES (?,?,?,?,?,?,?,?,?,?)""", ( symbol, symbol_name, market_code, sina_code, direction, float(d["zone_upper"]), float(d["zone_lower"]), float(d["stop_loss"]), float(d["take_profit"]), today_str(), ), ) conn.commit() conn.close() flash("开单计划已添加") return redirect(url_for("plans")) @app.route("/del_plan/") @login_required def del_plan(pid): conn = get_db() conn.execute("DELETE FROM order_plans WHERE id=?", (pid,)) conn.commit() conn.close() flash("已删除") return redirect(url_for("plans")) @app.route("/keys") @login_required def keys(): conn = get_db() key_list = conn.execute("SELECT * FROM key_monitors ORDER BY id DESC").fetchall() conn.close() return render_template("keys.html", keys=key_list) @app.route("/add_key", methods=["POST"]) @login_required def add_key(): d = request.form direction = d.get("direction") symbol = d.get("symbol", "").strip() symbol_name = d.get("symbol_name", "").strip() market_code = d.get("market_code", "").strip() sina_code = d.get("sina_code", "").strip() if not direction: flash("请选择多空方向") return redirect(url_for("keys")) if not symbol or not market_code: flash("请从下拉列表选择品种(同花顺合约代码)") return redirect(url_for("keys")) conn = get_db() conn.execute( """INSERT INTO key_monitors (symbol, symbol_name, market_code, sina_code, monitor_type, direction, upper, lower) VALUES (?,?,?,?,?,?,?,?)""", (symbol, symbol_name, market_code, sina_code, d["type"], direction, float(d["upper"]), float(d["lower"])), ) conn.commit() conn.close() flash("关键位监控已添加") return redirect(url_for("keys")) @app.route("/del_key/") @login_required def del_key(pid): conn = get_db() conn.execute("DELETE FROM key_monitors WHERE id=?", (pid,)) conn.commit() conn.close() flash("已删除") return redirect(url_for("keys")) @app.route("/records") @login_required def records(): start = request.args.get("start", "") end = request.args.get("end", "") conn = get_db() sql = "SELECT * FROM review_records WHERE 1=1" params: list = [] if start: sql += " AND date(close_time) >= ?" params.append(start) if end: sql += " AND date(close_time) <= ?" params.append(end) sql += " ORDER BY id DESC LIMIT 200" review_list = conn.execute(sql, params).fetchall() auto_list = conn.execute( "SELECT * FROM trade_records ORDER BY id DESC LIMIT 50" ).fetchall() conn.close() return render_template( "records.html", reviews=review_list, auto_records=auto_list, start=start, end=end, open_types=OPEN_TYPES, exit_triggers=EXIT_TRIGGERS, behavior_tags=BEHAVIOR_TAGS, kline_periods=KLINE_PERIODS, kline_cutoffs=KLINE_CUTOFFS, ) @app.route("/add_review", methods=["POST"]) @login_required def add_review(): d = request.form open_type = d.get("open_type", "").strip() exit_trigger = d.get("exit_trigger", "").strip() if not open_type: flash("请选择开仓类型") return redirect(url_for("records")) if not exit_trigger: flash("请选择离场触发") return redirect(url_for("records")) screenshot = "" f = request.files.get("screenshot") if f and f.filename: fname = secure_filename(f.filename) ts = datetime.now(TZ).strftime("%Y%m%d%H%M%S") screenshot = f"{ts}_{fname}" f.save(os.path.join(UPLOAD_DIR, screenshot)) tags = [t for t in BEHAVIOR_TAGS if d.get(f"tag_{t}")] def num(key: str) -> Optional[float]: v = d.get(key, "").strip() if not v: return None return float(v) conn = get_db() conn.execute( """INSERT INTO review_records (open_time, close_time, symbol, timeframe, pnl, open_type, expected_rr, actual_rr, exit_trigger, exit_supplement, watch_after_breakeven, new_position_while_occupied, screenshot, auto_kline, kline_period1, kline_period2, kline_count, kline_cutoff, behavior_tags, notes) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", ( d.get("open_time", "").strip(), d.get("close_time", "").strip(), d.get("symbol", "").strip(), d.get("timeframe", "").strip(), num("pnl"), open_type, num("expected_rr"), num("actual_rr"), exit_trigger, d.get("exit_supplement", "").strip(), d.get("watch_after_breakeven", "否"), d.get("new_position_while_occupied", "否"), screenshot, 1 if d.get("auto_kline") else 0, d.get("kline_period1", "15m"), d.get("kline_period2", "1h"), int(d.get("kline_count") or 300), d.get("kline_cutoff", "平仓时间"), ",".join(tags), d.get("notes", "").strip(), ), ) conn.commit() conn.close() flash("复盘记录已保存") return redirect(url_for("records")) @app.route("/del_review/") @login_required def del_review(rid): conn = get_db() row = conn.execute("SELECT screenshot FROM review_records WHERE id=?", (rid,)).fetchone() if row and row["screenshot"]: path = os.path.join(UPLOAD_DIR, row["screenshot"]) if os.path.isfile(path): os.remove(path) conn.execute("DELETE FROM review_records WHERE id=?", (rid,)) conn.commit() conn.close() flash("已删除") return redirect(url_for("records")) @app.route("/uploads/") @login_required def uploaded_file(filename): from flask import send_from_directory return send_from_directory(UPLOAD_DIR, filename) @app.route("/del_record/") @login_required def del_record(rid): conn = get_db() conn.execute("DELETE FROM trade_records WHERE id=?", (rid,)) conn.commit() conn.close() flash("已删除") return redirect(url_for("records")) @app.route("/stats") @login_required def stats(): conn = get_db() total = conn.execute( "SELECT COUNT(*) FROM trade_records WHERE result IN ('止盈','止损')" ).fetchone()[0] win = conn.execute( "SELECT COUNT(*) FROM trade_records WHERE result='止盈'" ).fetchone()[0] loss = conn.execute( "SELECT COUNT(*) FROM trade_records WHERE result='止损'" ).fetchone()[0] rate = round(win / total * 100, 2) if total else 0 by_symbol = conn.execute( """SELECT symbol_name, symbol, COUNT(*) as cnt, SUM(CASE WHEN result='止盈' THEN 1 ELSE 0 END) as wins FROM trade_records WHERE result IN ('止盈','止损') GROUP BY symbol ORDER BY cnt DESC""" ).fetchall() by_type = conn.execute( """SELECT monitor_type, COUNT(*) as cnt, SUM(CASE WHEN result='止盈' THEN 1 ELSE 0 END) as wins FROM trade_records WHERE result IN ('止盈','止损') GROUP BY monitor_type ORDER BY cnt DESC""" ).fetchall() by_direction = conn.execute( """SELECT direction, COUNT(*) as cnt, SUM(CASE WHEN result='止盈' THEN 1 ELSE 0 END) as wins FROM trade_records WHERE result IN ('止盈','止损') GROUP BY direction""" ).fetchall() recent = conn.execute( "SELECT * FROM trade_records ORDER BY id DESC LIMIT 10" ).fetchall() conn.close() return render_template( "stats.html", total=total, win=win, loss=loss, rate=rate, by_symbol=by_symbol, by_type=by_type, by_direction=by_direction, recent=recent, ) @app.route("/settings", methods=["GET", "POST"]) @login_required def settings(): if request.method == "POST": action = request.form.get("action") if action == "wechat": webhook = request.form.get("wechat_webhook", "").strip() set_setting("wechat_webhook", webhook) flash("企业微信配置已保存") elif action == "password": old_p = request.form.get("old_password", "") new_p = request.form.get("new_password", "") new_p2 = request.form.get("new_password2", "") admin_hash = get_setting("admin_password_hash") if not check_password_hash(admin_hash, old_p): flash("原密码错误") elif len(new_p) < 6: flash("新密码至少 6 位") elif new_p != new_p2: flash("两次新密码不一致") else: set_setting("admin_password_hash", generate_password_hash(new_p)) flash("密码修改成功") return redirect(url_for("settings")) webhook = get_setting("wechat_webhook") username = get_setting("admin_username") return render_template( "settings.html", webhook=webhook, username=username, quote_label=get_quote_source_label(), ) # —————————————— 启动 —————————————— if __name__ == "__main__": threading.Thread(target=background_task, daemon=True).start() app.run(host=HOST, port=PORT, debug=DEBUG)