重构期货监控系统:多页面导航、开单计划、Ubuntu PM2 部署

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-15 11:04:00 +08:00
commit ae480cb3e7
16 changed files with 1416 additions and 0 deletions
+519
View File
@@ -0,0 +1,519 @@
import os
import sqlite3
import time
import threading
import requests
from datetime import datetime
from typing import Optional
from functools import wraps
from dotenv import load_dotenv
from flask import (
Flask, render_template, request, redirect, url_for,
flash, session, jsonify,
)
from werkzeug.security import check_password_hash, generate_password_hash
from symbols import search_symbols, get_by_code
load_dotenv()
app = Flask(__name__)
app.secret_key = os.getenv("SECRET_KEY", "futures_monitor_default_secret")
HOST = os.getenv("HOST", "0.0.0.0")
PORT = int(os.getenv("PORT", "6600"))
DEBUG = os.getenv("DEBUG", "false").lower() in ("1", "true", "yes")
DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "futures.db")
# —————————————— 设置读写 ——————————————
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def get_setting(key: str, default: str = "") -> str:
conn = get_db()
row = conn.execute("SELECT value FROM settings WHERE key=?", (key,)).fetchone()
conn.close()
return row["value"] if row else default
def set_setting(key: str, value: str):
conn = get_db()
conn.execute(
"INSERT INTO settings (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=?",
(key, value, value),
)
conn.commit()
conn.close()
def init_db():
conn = get_db()
c = conn.cursor()
c.execute("CREATE TABLE IF NOT EXISTS settings (key TEXT PRIMARY KEY, value TEXT)")
c.execute('''CREATE TABLE IF NOT EXISTS order_plans
(id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT, symbol_name TEXT, direction TEXT,
zone_upper REAL, zone_lower REAL,
stop_loss REAL, take_profit REAL,
status TEXT DEFAULT "planned",
triggered_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
c.execute('''CREATE TABLE IF NOT EXISTS key_monitors
(id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT, symbol_name TEXT, monitor_type TEXT, direction TEXT,
upper REAL, lower REAL,
upper_triggered INTEGER DEFAULT 0,
lower_triggered INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
c.execute('''CREATE TABLE IF NOT EXISTS trade_records
(id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT, symbol_name TEXT, monitor_type TEXT, direction TEXT,
trigger_price REAL, stop_loss REAL, take_profit REAL,
result TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
migrations = [
"ALTER TABLE key_monitors ADD COLUMN symbol_name TEXT",
"ALTER TABLE key_monitors ADD COLUMN upper_triggered INTEGER DEFAULT 0",
"ALTER TABLE key_monitors ADD COLUMN lower_triggered INTEGER DEFAULT 0",
"ALTER TABLE trade_records ADD COLUMN symbol_name TEXT",
]
for sql in migrations:
try:
c.execute(sql)
except sqlite3.OperationalError:
pass
conn.commit()
conn.close()
if not get_setting("admin_username"):
username = os.getenv("ADMIN_USERNAME", "admin")
password = os.getenv("ADMIN_PASSWORD", "admin123")
set_setting("admin_username", username)
set_setting("admin_password_hash", generate_password_hash(password))
if not get_setting("wechat_webhook") and os.getenv("WECHAT_WEBHOOK"):
set_setting("wechat_webhook", os.getenv("WECHAT_WEBHOOK"))
init_db()
# —————————————— 推送 ——————————————
def send_wechat_msg(content: str):
webhook = get_setting("wechat_webhook")
if not webhook:
return
full = f"【国内期货】\n{content}"
data = {"msgtype": "text", "text": {"content": full}}
try:
requests.post(webhook, json=data, timeout=10)
except Exception:
pass
# —————————————— 行情 ——————————————
def get_price(symbol: str) -> Optional[float]:
try:
url = f"https://hq.sinajs.cn/list={symbol}"
headers = {"Referer": "https://finance.sina.com.cn"}
resp = requests.get(url, headers=headers, timeout=5)
text = resp.text
if "=" not in text:
return None
data = text.split("=")[1].strip().strip('"').split(",")
if len(data) < 9:
return None
return float(data[8])
except Exception:
return None
# —————————————— 监控逻辑 ——————————————
def check_order_plans():
conn = get_db()
rows = conn.execute(
"SELECT * FROM order_plans WHERE status IN ('planned', 'active')"
).fetchall()
for r in rows:
sym = r["symbol"]
p = get_price(sym)
if not p:
continue
direction = r["direction"]
zone_upper = r["zone_upper"]
zone_lower = r["zone_lower"]
stop_loss = r["stop_loss"]
take_profit = r["take_profit"]
status = r["status"]
pid = r["id"]
name = r["symbol_name"] or sym
# 计划状态:价格进入决策区间则激活并通知
if status == "planned":
in_zone = zone_lower <= p <= zone_upper
if in_zone:
msg = (
f"【开单计划触发】{name} ({sym})\n"
f"方向:{'做多' if direction == 'long' else '做空'}\n"
f"决策区间:{zone_lower} ~ {zone_upper}\n"
f"当前价:{p}\n"
f"止损:{stop_loss} 止盈:{take_profit}"
)
send_wechat_msg(msg)
conn.execute(
"UPDATE order_plans SET status='active', triggered_at=? WHERE id=?",
(datetime.now().isoformat(), pid),
)
status = "active"
# 激活状态:监控止盈止损
if status == "active":
res = None
if direction == "long":
if p >= take_profit:
res = "止盈"
elif p <= stop_loss:
res = "止损"
elif direction == "short":
if p <= take_profit:
res = "止盈"
elif p >= stop_loss:
res = "止损"
if res:
msg = (
f"[{'做多' if direction == 'long' else '做空'}] {name}{res}\n"
f"决策区间:{zone_lower} ~ {zone_upper}\n"
f"止损:{stop_loss} 止盈:{take_profit}\n"
f"当前价:{p}"
)
send_wechat_msg(msg)
conn.execute(
"""INSERT INTO trade_records
(symbol, symbol_name, monitor_type, direction,
trigger_price, stop_loss, take_profit, result)
VALUES (?,?,?,?,?,?,?,?)""",
(sym, name, "开单计划", direction, p, stop_loss, take_profit, res),
)
conn.execute(
"UPDATE order_plans SET status='closed' WHERE id=?", (pid,)
)
conn.commit()
conn.close()
def check_key_monitors():
conn = get_db()
rows = conn.execute("SELECT * FROM key_monitors").fetchall()
for r in rows:
sym = r["symbol"]
typ = r["monitor_type"]
up = r["upper"]
low = r["lower"]
up_trig = r["upper_triggered"]
low_trig = r["lower_triggered"]
name = r["symbol_name"] or sym
pid = r["id"]
p = get_price(sym)
if not p:
continue
if typ in ("箱体突破", "收敛突破"):
if p > up and not up_trig:
send_wechat_msg(f"{name} 突破{typ}上沿 {up}\n当前价:{p}")
conn.execute(
"UPDATE key_monitors SET upper_triggered=1 WHERE id=?", (pid,)
)
if p < low and not low_trig:
send_wechat_msg(f"{name} 跌破{typ}下沿 {low}\n当前价:{p}")
conn.execute(
"UPDATE key_monitors SET lower_triggered=1 WHERE id=?", (pid,)
)
elif typ == "关键阻力位" and p > up and not up_trig:
send_wechat_msg(f"{name} 突破阻力位 {up}\n当前价:{p}")
conn.execute(
"UPDATE key_monitors SET upper_triggered=1 WHERE id=?", (pid,)
)
elif typ == "关键支撑位" and p < low and not low_trig:
send_wechat_msg(f"{name} 跌破支撑位 {low}\n当前价:{p}")
conn.execute(
"UPDATE key_monitors SET lower_triggered=1 WHERE id=?", (pid,)
)
conn.commit()
conn.close()
def background_task():
while True:
try:
check_key_monitors()
check_order_plans()
except Exception:
pass
time.sleep(3)
# —————————————— 登录 ——————————————
def login_required(f):
@wraps(f)
def wrap(*args, **kwargs):
if not session.get("logged_in"):
return redirect(url_for("login"))
return f(*args, **kwargs)
return wrap
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
u = request.form.get("username", "").strip()
p = request.form.get("password", "")
admin_u = get_setting("admin_username")
admin_hash = get_setting("admin_password_hash")
if u == admin_u and check_password_hash(admin_hash, p):
session["logged_in"] = True
session["username"] = u
return redirect(url_for("plans"))
flash("账号或密码错误")
return render_template("login.html")
@app.route("/logout")
def logout():
session.clear()
return redirect(url_for("login"))
# —————————————— API ——————————————
@app.route("/api/symbols/search")
@login_required
def api_symbol_search():
q = request.args.get("q", "")
return jsonify(search_symbols(q))
# —————————————— 页面路由 ——————————————
@app.route("/")
@login_required
def index():
return redirect(url_for("plans"))
@app.route("/plans")
@login_required
def plans():
conn = get_db()
plan_list = conn.execute(
"SELECT * FROM order_plans WHERE status != 'closed' ORDER BY id DESC"
).fetchall()
closed = conn.execute(
"SELECT * FROM order_plans WHERE status='closed' ORDER BY id DESC LIMIT 20"
).fetchall()
conn.close()
return render_template("plans.html", plans=plan_list, closed=closed)
@app.route("/add_plan", methods=["POST"])
@login_required
def add_plan():
d = request.form
direction = d.get("direction")
symbol = d.get("symbol", "").strip()
symbol_name = d.get("symbol_name", "").strip()
if not direction:
flash("请选择多空方向")
return redirect(url_for("plans"))
if not symbol:
flash("请选择有效品种")
return redirect(url_for("plans"))
conn = get_db()
conn.execute(
"""INSERT INTO order_plans
(symbol, symbol_name, direction, zone_upper, zone_lower, stop_loss, take_profit)
VALUES (?,?,?,?,?,?,?)""",
(
symbol, symbol_name, direction,
float(d["zone_upper"]), float(d["zone_lower"]),
float(d["stop_loss"]), float(d["take_profit"]),
),
)
conn.commit()
conn.close()
flash("开单计划已添加")
return redirect(url_for("plans"))
@app.route("/del_plan/<int:pid>")
@login_required
def del_plan(pid):
conn = get_db()
conn.execute("DELETE FROM order_plans WHERE id=?", (pid,))
conn.commit()
conn.close()
flash("已删除")
return redirect(url_for("plans"))
@app.route("/keys")
@login_required
def keys():
conn = get_db()
key_list = conn.execute("SELECT * FROM key_monitors ORDER BY id DESC").fetchall()
conn.close()
return render_template("keys.html", keys=key_list)
@app.route("/add_key", methods=["POST"])
@login_required
def add_key():
d = request.form
direction = d.get("direction")
symbol = d.get("symbol", "").strip()
symbol_name = d.get("symbol_name", "").strip()
if not direction:
flash("请选择多空方向")
return redirect(url_for("keys"))
if not symbol:
flash("请选择有效品种")
return redirect(url_for("keys"))
conn = get_db()
conn.execute(
"""INSERT INTO key_monitors
(symbol, symbol_name, monitor_type, direction, upper, lower)
VALUES (?,?,?,?,?,?)""",
(symbol, symbol_name, d["type"], direction, float(d["upper"]), float(d["lower"])),
)
conn.commit()
conn.close()
flash("关键位监控已添加")
return redirect(url_for("keys"))
@app.route("/del_key/<int:pid>")
@login_required
def del_key(pid):
conn = get_db()
conn.execute("DELETE FROM key_monitors WHERE id=?", (pid,))
conn.commit()
conn.close()
flash("已删除")
return redirect(url_for("keys"))
@app.route("/records")
@login_required
def records():
conn = get_db()
record_list = conn.execute(
"SELECT * FROM trade_records ORDER BY id DESC"
).fetchall()
conn.close()
return render_template("records.html", records=record_list)
@app.route("/del_record/<int:rid>")
@login_required
def del_record(rid):
conn = get_db()
conn.execute("DELETE FROM trade_records WHERE id=?", (rid,))
conn.commit()
conn.close()
flash("已删除")
return redirect(url_for("records"))
@app.route("/stats")
@login_required
def stats():
conn = get_db()
total = conn.execute(
"SELECT COUNT(*) FROM trade_records WHERE result IN ('止盈','止损')"
).fetchone()[0]
win = conn.execute(
"SELECT COUNT(*) FROM trade_records WHERE result='止盈'"
).fetchone()[0]
loss = conn.execute(
"SELECT COUNT(*) FROM trade_records WHERE result='止损'"
).fetchone()[0]
rate = round(win / total * 100, 2) if total else 0
by_symbol = conn.execute(
"""SELECT symbol_name, symbol, COUNT(*) as cnt,
SUM(CASE WHEN result='止盈' THEN 1 ELSE 0 END) as wins
FROM trade_records WHERE result IN ('止盈','止损')
GROUP BY symbol ORDER BY cnt DESC"""
).fetchall()
by_type = conn.execute(
"""SELECT monitor_type, COUNT(*) as cnt,
SUM(CASE WHEN result='止盈' THEN 1 ELSE 0 END) as wins
FROM trade_records WHERE result IN ('止盈','止损')
GROUP BY monitor_type ORDER BY cnt DESC"""
).fetchall()
by_direction = conn.execute(
"""SELECT direction, COUNT(*) as cnt,
SUM(CASE WHEN result='止盈' THEN 1 ELSE 0 END) as wins
FROM trade_records WHERE result IN ('止盈','止损')
GROUP BY direction"""
).fetchall()
recent = conn.execute(
"SELECT * FROM trade_records ORDER BY id DESC LIMIT 10"
).fetchall()
conn.close()
return render_template(
"stats.html",
total=total, win=win, loss=loss, rate=rate,
by_symbol=by_symbol, by_type=by_type, by_direction=by_direction,
recent=recent,
)
@app.route("/settings", methods=["GET", "POST"])
@login_required
def settings():
if request.method == "POST":
action = request.form.get("action")
if action == "wechat":
webhook = request.form.get("wechat_webhook", "").strip()
set_setting("wechat_webhook", webhook)
flash("企业微信配置已保存")
elif action == "password":
old_p = request.form.get("old_password", "")
new_p = request.form.get("new_password", "")
new_p2 = request.form.get("new_password2", "")
admin_hash = get_setting("admin_password_hash")
if not check_password_hash(admin_hash, old_p):
flash("原密码错误")
elif len(new_p) < 6:
flash("新密码至少 6 位")
elif new_p != new_p2:
flash("两次新密码不一致")
else:
set_setting("admin_password_hash", generate_password_hash(new_p))
flash("密码修改成功")
return redirect(url_for("settings"))
webhook = get_setting("wechat_webhook")
username = get_setting("admin_username")
return render_template("settings.html", webhook=webhook, username=username)
# —————————————— 启动 ——————————————
if __name__ == "__main__":
threading.Thread(target=background_task, daemon=True).start()
app.run(host=HOST, port=PORT, debug=DEBUG)