"""防重复提交:Flask session 短窗口去重(下单 / 关键位等)。""" from __future__ import annotations import time from typing import Any, Optional DEFAULT_SUBMIT_GUARD_TTL = 90.0 def _prune_locks(locks: dict, now: float) -> dict: return {k: float(v) for k, v in (locks or {}).items() if float(v) > now} def check_duplicate_submit( session: Any, scope: str, *, ttl: float = DEFAULT_SUBMIT_GUARD_TTL, ) -> Optional[str]: """ 同一 scope 在 ttl 秒内仅允许通过一次。 返回提示文案表示应拒绝;返回 None 表示可继续处理。 """ scope = (scope or "").strip() if not scope: return None now = time.time() locks = _prune_locks(session.get("_form_submit_guard") or {}, now) if scope in locks: return "请求正在处理或刚提交过,请勿重复点击(请等待页面刷新后再试)" locks[scope] = now + float(ttl) session["_form_submit_guard"] = locks try: session.modified = True except Exception: pass return None def submit_scope_add_order(symbol: str, direction: str) -> str: sym = (symbol or "").strip().upper() d = (direction or "").strip().lower() return f"add_order:{sym}:{d}" def submit_scope_add_key(symbol: str, monitor_type: str, direction: str) -> str: sym = (symbol or "").strip().upper() mt = (monitor_type or "").strip() d = (direction or "").strip().lower() or "watch" return f"add_key:{sym}:{mt}:{d}"