import os import sqlite3 import time import threading import requests from datetime import datetime from typing import Optional from functools import wraps from dotenv import load_dotenv from flask import ( Flask, render_template, request, redirect, url_for, flash, session, jsonify, ) from werkzeug.security import check_password_hash, generate_password_hash from symbols import search_symbols, ths_to_codes from market import get_price as market_get_price, set_ths_refresh_token, get_quote_source_label load_dotenv(os.path.join(os.path.dirname(os.path.abspath(__file__)), ".env")) app = Flask(__name__) app.secret_key = os.getenv("SECRET_KEY", "futures_monitor_default_secret") HOST = os.getenv("HOST", "0.0.0.0") PORT = int(os.getenv("PORT", "6600")) DEBUG = os.getenv("DEBUG", "false").lower() in ("1", "true", "yes") DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "futures.db") # —————————————— 设置读写 —————————————— def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def get_setting(key: str, default: str = "") -> str: conn = get_db() row = conn.execute("SELECT value FROM settings WHERE key=?", (key,)).fetchone() conn.close() return row["value"] if row else default def set_setting(key: str, value: str): conn = get_db() conn.execute( "INSERT INTO settings (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=?", (key, value, value), ) conn.commit() conn.close() def init_db(): conn = get_db() c = conn.cursor() c.execute("CREATE TABLE IF NOT EXISTS settings (key TEXT PRIMARY KEY, value TEXT)") c.execute('''CREATE TABLE IF NOT EXISTS order_plans (id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT, symbol_name TEXT, direction TEXT, zone_upper REAL, zone_lower REAL, stop_loss REAL, take_profit REAL, status TEXT DEFAULT "planned", triggered_at TIMESTAMP, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''') c.execute('''CREATE TABLE IF NOT EXISTS key_monitors (id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT, symbol_name TEXT, monitor_type TEXT, direction TEXT, upper REAL, lower REAL, upper_triggered INTEGER DEFAULT 0, lower_triggered INTEGER DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''') c.execute('''CREATE TABLE IF NOT EXISTS trade_records (id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT, symbol_name TEXT, monitor_type TEXT, direction TEXT, trigger_price REAL, stop_loss REAL, take_profit REAL, result TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''') migrations = [ "ALTER TABLE key_monitors ADD COLUMN symbol_name TEXT", "ALTER TABLE key_monitors ADD COLUMN upper_triggered INTEGER DEFAULT 0", "ALTER TABLE key_monitors ADD COLUMN lower_triggered INTEGER DEFAULT 0", "ALTER TABLE trade_records ADD COLUMN symbol_name TEXT", "ALTER TABLE order_plans ADD COLUMN sina_code TEXT", "ALTER TABLE order_plans ADD COLUMN market_code TEXT", "ALTER TABLE key_monitors ADD COLUMN market_code TEXT", "ALTER TABLE trade_records ADD COLUMN market_code TEXT", ] for sql in migrations: try: c.execute(sql) except sqlite3.OperationalError: pass conn.commit() conn.close() sync_admin_from_env() if not get_setting("wechat_webhook") and os.getenv("WECHAT_WEBHOOK"): set_setting("wechat_webhook", os.getenv("WECHAT_WEBHOOK")) if not get_setting("ths_refresh_token") and os.getenv("THS_REFRESH_TOKEN"): set_setting("ths_refresh_token", os.getenv("THS_REFRESH_TOKEN")) def sync_admin_from_env(): """ 从 .env 同步管理员账号。 - 首次建库:自动写入 ADMIN_USERNAME / ADMIN_PASSWORD - 已建库后改 .env:需设 ADMIN_SYNC_FROM_ENV=true 并重启服务 """ sync = os.getenv("ADMIN_SYNC_FROM_ENV", "false").lower() in ("1", "true", "yes") env_username = os.getenv("ADMIN_USERNAME", "").strip() env_password = os.getenv("ADMIN_PASSWORD", "").strip() placeholder_passwords = {"", "change-me-on-first-login", "admin123"} if not get_setting("admin_username"): username = env_username or "admin" password = env_password if env_password not in placeholder_passwords else "admin123" set_setting("admin_username", username) set_setting("admin_password_hash", generate_password_hash(password)) return if not sync: return if env_username: set_setting("admin_username", env_username) if env_password and env_password not in placeholder_passwords: set_setting("admin_password_hash", generate_password_hash(env_password)) init_db() def sync_ths_token(): set_ths_refresh_token(get_setting("ths_refresh_token")) sync_ths_token() # —————————————— 推送 —————————————— def send_wechat_msg(content: str): webhook = get_setting("wechat_webhook") if not webhook: return full = f"【国内期货】\n{content}" data = {"msgtype": "text", "text": {"content": full}} try: requests.post(webhook, json=data, timeout=10) except Exception: pass # —————————————— 行情 —————————————— def resolve_market_codes(ths_code: str, market_code: str = "", sina_code: str = "") -> tuple[str, str]: """返回 (market_code, sina_code) 用于行情拉取。""" if market_code: return market_code, sina_code if sina_code and "." in sina_code: return sina_code, "" codes = ths_to_codes(ths_code) if codes: return codes["market_code"], codes["sina_code"] if ths_code.startswith("nf_") or ths_code.startswith("CFF_RE_"): return ths_code, ths_code return "", sina_code or "" def fetch_price(ths_code: str, market_code: str = "", sina_code: str = "") -> Optional[float]: mc, sc = resolve_market_codes(ths_code, market_code, sina_code) if not mc and not sc: return None return market_get_price(mc, sc) # —————————————— 监控逻辑 —————————————— def check_order_plans(): conn = get_db() rows = conn.execute( "SELECT * FROM order_plans WHERE status IN ('planned', 'active')" ).fetchall() for r in rows: sym = r["symbol"] sina = r["sina_code"] if "sina_code" in r.keys() else "" market = r["market_code"] if "market_code" in r.keys() else "" p = fetch_price(sym, market, sina) if not p: continue direction = r["direction"] zone_upper = r["zone_upper"] zone_lower = r["zone_lower"] stop_loss = r["stop_loss"] take_profit = r["take_profit"] status = r["status"] pid = r["id"] name = r["symbol_name"] or sym # 计划状态:价格进入决策区间则激活并通知 if status == "planned": in_zone = zone_lower <= p <= zone_upper if in_zone: msg = ( f"【开单计划触发】{name} ({sym})\n" f"方向:{'做多' if direction == 'long' else '做空'}\n" f"决策区间:{zone_lower} ~ {zone_upper}\n" f"当前价:{p}\n" f"止损:{stop_loss} 止盈:{take_profit}" ) send_wechat_msg(msg) conn.execute( "UPDATE order_plans SET status='active', triggered_at=? WHERE id=?", (datetime.now().isoformat(), pid), ) status = "active" # 激活状态:监控止盈止损 if status == "active": res = None if direction == "long": if p >= take_profit: res = "止盈" elif p <= stop_loss: res = "止损" elif direction == "short": if p <= take_profit: res = "止盈" elif p >= stop_loss: res = "止损" if res: msg = ( f"[{'做多' if direction == 'long' else '做空'}] {name} 已{res}\n" f"决策区间:{zone_lower} ~ {zone_upper}\n" f"止损:{stop_loss} 止盈:{take_profit}\n" f"当前价:{p}" ) send_wechat_msg(msg) conn.execute( """INSERT INTO trade_records (symbol, symbol_name, monitor_type, direction, trigger_price, stop_loss, take_profit, result) VALUES (?,?,?,?,?,?,?,?)""", (sym, name, "开单计划", direction, p, stop_loss, take_profit, res), ) conn.execute( "UPDATE order_plans SET status='closed' WHERE id=?", (pid,) ) conn.commit() conn.close() def check_key_monitors(): conn = get_db() rows = conn.execute("SELECT * FROM key_monitors").fetchall() for r in rows: sym = r["symbol"] typ = r["monitor_type"] up = r["upper"] low = r["lower"] up_trig = r["upper_triggered"] low_trig = r["lower_triggered"] name = r["symbol_name"] or sym pid = r["id"] sina = r["sina_code"] if "sina_code" in r.keys() else "" market = r["market_code"] if "market_code" in r.keys() else "" p = fetch_price(sym, market, sina) if not p: continue if typ in ("箱体突破", "收敛突破"): if p > up and not up_trig: send_wechat_msg(f"{name} 突破{typ}上沿 {up}\n当前价:{p}") conn.execute( "UPDATE key_monitors SET upper_triggered=1 WHERE id=?", (pid,) ) if p < low and not low_trig: send_wechat_msg(f"{name} 跌破{typ}下沿 {low}\n当前价:{p}") conn.execute( "UPDATE key_monitors SET lower_triggered=1 WHERE id=?", (pid,) ) elif typ == "关键阻力位" and p > up and not up_trig: send_wechat_msg(f"{name} 突破阻力位 {up}\n当前价:{p}") conn.execute( "UPDATE key_monitors SET upper_triggered=1 WHERE id=?", (pid,) ) elif typ == "关键支撑位" and p < low and not low_trig: send_wechat_msg(f"{name} 跌破支撑位 {low}\n当前价:{p}") conn.execute( "UPDATE key_monitors SET lower_triggered=1 WHERE id=?", (pid,) ) conn.commit() conn.close() def background_task(): while True: try: check_key_monitors() check_order_plans() except Exception: pass time.sleep(3) # —————————————— 登录 —————————————— def login_required(f): @wraps(f) def wrap(*args, **kwargs): if not session.get("logged_in"): return redirect(url_for("login")) return f(*args, **kwargs) return wrap @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": u = request.form.get("username", "").strip() p = request.form.get("password", "") admin_u = get_setting("admin_username") admin_hash = get_setting("admin_password_hash") if u == admin_u and check_password_hash(admin_hash, p): session["logged_in"] = True session["username"] = u return redirect(url_for("plans")) flash("账号或密码错误") return render_template("login.html") @app.route("/logout") def logout(): session.clear() return redirect(url_for("login")) # —————————————— API —————————————— @app.route("/api/symbols/search") @login_required def api_symbol_search(): q = request.args.get("q", "") return jsonify(search_symbols(q)) # —————————————— 页面路由 —————————————— @app.route("/") @login_required def index(): return redirect(url_for("plans")) @app.route("/plans") @login_required def plans(): conn = get_db() plan_list = conn.execute( "SELECT * FROM order_plans WHERE status != 'closed' ORDER BY id DESC" ).fetchall() closed = conn.execute( "SELECT * FROM order_plans WHERE status='closed' ORDER BY id DESC LIMIT 20" ).fetchall() conn.close() return render_template("plans.html", plans=plan_list, closed=closed) @app.route("/add_plan", methods=["POST"]) @login_required def add_plan(): d = request.form direction = d.get("direction") symbol = d.get("symbol", "").strip() symbol_name = d.get("symbol_name", "").strip() market_code = d.get("market_code", "").strip() sina_code = d.get("sina_code", "").strip() if not direction: flash("请选择多空方向") return redirect(url_for("plans")) if not symbol or not market_code: flash("请从下拉列表选择品种(同花顺合约代码)") return redirect(url_for("plans")) conn = get_db() conn.execute( """INSERT INTO order_plans (symbol, symbol_name, market_code, sina_code, direction, zone_upper, zone_lower, stop_loss, take_profit) VALUES (?,?,?,?,?,?,?,?)""", ( symbol, symbol_name, market_code, sina_code, direction, float(d["zone_upper"]), float(d["zone_lower"]), float(d["stop_loss"]), float(d["take_profit"]), ), ) conn.commit() conn.close() flash("开单计划已添加") return redirect(url_for("plans")) @app.route("/del_plan/") @login_required def del_plan(pid): conn = get_db() conn.execute("DELETE FROM order_plans WHERE id=?", (pid,)) conn.commit() conn.close() flash("已删除") return redirect(url_for("plans")) @app.route("/keys") @login_required def keys(): conn = get_db() key_list = conn.execute("SELECT * FROM key_monitors ORDER BY id DESC").fetchall() conn.close() return render_template("keys.html", keys=key_list) @app.route("/add_key", methods=["POST"]) @login_required def add_key(): d = request.form direction = d.get("direction") symbol = d.get("symbol", "").strip() symbol_name = d.get("symbol_name", "").strip() market_code = d.get("market_code", "").strip() sina_code = d.get("sina_code", "").strip() if not direction: flash("请选择多空方向") return redirect(url_for("keys")) if not symbol or not market_code: flash("请从下拉列表选择品种(同花顺合约代码)") return redirect(url_for("keys")) conn = get_db() conn.execute( """INSERT INTO key_monitors (symbol, symbol_name, market_code, sina_code, monitor_type, direction, upper, lower) VALUES (?,?,?,?,?,?,?,?)""", (symbol, symbol_name, market_code, sina_code, d["type"], direction, float(d["upper"]), float(d["lower"])), ) conn.commit() conn.close() flash("关键位监控已添加") return redirect(url_for("keys")) @app.route("/del_key/") @login_required def del_key(pid): conn = get_db() conn.execute("DELETE FROM key_monitors WHERE id=?", (pid,)) conn.commit() conn.close() flash("已删除") return redirect(url_for("keys")) @app.route("/records") @login_required def records(): conn = get_db() record_list = conn.execute( "SELECT * FROM trade_records ORDER BY id DESC" ).fetchall() conn.close() return render_template("records.html", records=record_list) @app.route("/del_record/") @login_required def del_record(rid): conn = get_db() conn.execute("DELETE FROM trade_records WHERE id=?", (rid,)) conn.commit() conn.close() flash("已删除") return redirect(url_for("records")) @app.route("/stats") @login_required def stats(): conn = get_db() total = conn.execute( "SELECT COUNT(*) FROM trade_records WHERE result IN ('止盈','止损')" ).fetchone()[0] win = conn.execute( "SELECT COUNT(*) FROM trade_records WHERE result='止盈'" ).fetchone()[0] loss = conn.execute( "SELECT COUNT(*) FROM trade_records WHERE result='止损'" ).fetchone()[0] rate = round(win / total * 100, 2) if total else 0 by_symbol = conn.execute( """SELECT symbol_name, symbol, COUNT(*) as cnt, SUM(CASE WHEN result='止盈' THEN 1 ELSE 0 END) as wins FROM trade_records WHERE result IN ('止盈','止损') GROUP BY symbol ORDER BY cnt DESC""" ).fetchall() by_type = conn.execute( """SELECT monitor_type, COUNT(*) as cnt, SUM(CASE WHEN result='止盈' THEN 1 ELSE 0 END) as wins FROM trade_records WHERE result IN ('止盈','止损') GROUP BY monitor_type ORDER BY cnt DESC""" ).fetchall() by_direction = conn.execute( """SELECT direction, COUNT(*) as cnt, SUM(CASE WHEN result='止盈' THEN 1 ELSE 0 END) as wins FROM trade_records WHERE result IN ('止盈','止损') GROUP BY direction""" ).fetchall() recent = conn.execute( "SELECT * FROM trade_records ORDER BY id DESC LIMIT 10" ).fetchall() conn.close() return render_template( "stats.html", total=total, win=win, loss=loss, rate=rate, by_symbol=by_symbol, by_type=by_type, by_direction=by_direction, recent=recent, ) @app.route("/settings", methods=["GET", "POST"]) @login_required def settings(): if request.method == "POST": action = request.form.get("action") if action == "wechat": webhook = request.form.get("wechat_webhook", "").strip() set_setting("wechat_webhook", webhook) flash("企业微信配置已保存") elif action == "password": old_p = request.form.get("old_password", "") new_p = request.form.get("new_password", "") new_p2 = request.form.get("new_password2", "") admin_hash = get_setting("admin_password_hash") if not check_password_hash(admin_hash, old_p): flash("原密码错误") elif len(new_p) < 6: flash("新密码至少 6 位") elif new_p != new_p2: flash("两次新密码不一致") else: set_setting("admin_password_hash", generate_password_hash(new_p)) flash("密码修改成功") return redirect(url_for("settings")) webhook = get_setting("wechat_webhook") username = get_setting("admin_username") return render_template( "settings.html", webhook=webhook, username=username, quote_label=get_quote_source_label(), ) # —————————————— 启动 —————————————— if __name__ == "__main__": threading.Thread(target=background_task, daemon=True).start() app.run(host=HOST, port=PORT, debug=DEBUG)