refactor: 移除 gate_bot,统一为三所架构并更新文档

删除 crypto_monitor_gate_bot 目录,中控与子代理改为 binance/okx/gate 三账户;
文档与 UI 文案「四所」改为「三所」;新增清库前一次性配置备份脚本。

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-07-04 22:00:08 +08:00
parent be51eee73f
commit 9f67de3677
138 changed files with 26395 additions and 40057 deletions
+1 -1
View File
@@ -1,4 +1,4 @@
"""账户冷静期 / 日冻结风控(所实例共用)。"""
"""账户冷静期 / 日冻结风控(所实例共用)。"""
from __future__ import annotations
import os
+140 -140
View File
@@ -1,140 +1,140 @@
"""单日开仓次数:软提醒阈值 + 硬上限(所实例共用)。"""
from __future__ import annotations
import os
from typing import Any, Optional
def parse_daily_open_alert_threshold(raw: Any = None, *, default: int = 5) -> int:
"""AI 克制提醒阈值;至少 1。"""
try:
v = int(raw if raw is not None and str(raw).strip() != "" else default)
except (TypeError, ValueError):
v = default
return max(1, v)
def parse_daily_open_hard_limit(raw: Any = None, *, default: int = 0) -> int:
"""硬上限;0 表示不启用。至少 0。"""
try:
v = int(raw if raw is not None and str(raw).strip() != "" else default)
except (TypeError, ValueError):
v = default
return max(0, v)
def load_daily_open_limits_from_env(
env: Optional[dict[str, str]] = None,
) -> tuple[int, int]:
"""从环境变量读取 (alert_threshold, hard_limit)。"""
src = env if env is not None else os.environ
alert = parse_daily_open_alert_threshold(src.get("DAILY_OPEN_ALERT_THRESHOLD"))
hard = parse_daily_open_hard_limit(src.get("DAILY_OPEN_HARD_LIMIT"))
return alert, hard
def count_opens_for_trading_day(conn, trading_day: str) -> int:
"""本交易日已成功写入 order_monitors 的开仓次数。"""
td = (trading_day or "").strip()
if not td:
return 0
row = conn.execute(
"SELECT COUNT(*) FROM order_monitors WHERE session_date=?",
(td,),
).fetchone()
return int(row[0] if row else 0)
def daily_open_hard_limit_blocks(opens_today: int, hard_limit: int) -> bool:
return int(hard_limit) > 0 and int(opens_today) >= int(hard_limit)
def hard_limit_block_reason(opens_today: int, hard_limit: int, reset_hour: int) -> str:
return (
f"本交易日开仓次数已达上限({int(opens_today)}/{int(hard_limit)}),"
f"次日北京时间 {int(reset_hour)}:00 后恢复"
)
def check_daily_open_hard_limit(
conn,
trading_day: str,
hard_limit: int,
reset_hour: int,
) -> tuple[bool, str, int]:
"""返回 (允许继续开仓, 拒绝原因, 当日已开次数)。"""
opens_today = count_opens_for_trading_day(conn, trading_day)
if daily_open_hard_limit_blocks(opens_today, hard_limit):
return False, hard_limit_block_reason(opens_today, hard_limit, reset_hour), opens_today
return True, "", opens_today
def can_trade_new_open(
*,
time_allows: bool,
active_count: int,
max_active_positions: int,
opens_today: int,
hard_limit: int,
extra_blocks: bool = False,
) -> bool:
if extra_blocks:
return False
if not time_allows:
return False
if int(active_count) >= int(max_active_positions):
return False
if daily_open_hard_limit_blocks(opens_today, hard_limit):
return False
return True
def should_send_daily_open_alert(before: int, after: int, alert_threshold: int) -> bool:
return int(before) < int(alert_threshold) <= int(after)
def build_daily_open_alert_prompt(
trading_day: str,
opens_after: int,
alert_threshold: int,
*,
hard_limit: int = 0,
detail_line: str = "",
) -> str:
hard_txt = (
f"硬上限 {hard_limit} 次(已达后将禁止新开仓直至下一交易日)。"
if int(hard_limit) > 0
else "未配置单日硬上限。"
)
extra = f" {detail_line}" if detail_line else ""
return (
f"用户在北京时间交易日 {trading_day} 已累计开仓 {opens_after}"
f"AI 提醒阈值 {alert_threshold}{hard_txt}"
f"{extra}"
f"用户自述“上头了”。请给克制提醒。"
)
def format_daily_open_counter_line(
opens_today: int,
alert_threshold: int,
hard_limit: int,
) -> str:
if int(hard_limit) > 0:
return (
f"📅 当日开仓次数:{int(opens_today)} / 硬上限 {int(hard_limit)}"
f"AI 提醒阈值 {int(alert_threshold)}"
)
return (
f"📅 当日开仓次数:{int(opens_today)} / AI 提醒阈值 {int(alert_threshold)}"
)
def format_daily_open_summary_short(
opens_today: int,
alert_threshold: int,
hard_limit: int,
) -> str:
if int(hard_limit) > 0:
return f"本交易日累计开仓:{int(opens_today)}(硬上限 {int(hard_limit)},提醒 {int(alert_threshold)}"
return f"本交易日累计开仓:{int(opens_today)}(提醒阈值 {int(alert_threshold)}"
"""单日开仓次数:软提醒阈值 + 硬上限(所实例共用)。"""
from __future__ import annotations
import os
from typing import Any, Optional
def parse_daily_open_alert_threshold(raw: Any = None, *, default: int = 5) -> int:
"""AI 克制提醒阈值;至少 1。"""
try:
v = int(raw if raw is not None and str(raw).strip() != "" else default)
except (TypeError, ValueError):
v = default
return max(1, v)
def parse_daily_open_hard_limit(raw: Any = None, *, default: int = 0) -> int:
"""硬上限;0 表示不启用。至少 0。"""
try:
v = int(raw if raw is not None and str(raw).strip() != "" else default)
except (TypeError, ValueError):
v = default
return max(0, v)
def load_daily_open_limits_from_env(
env: Optional[dict[str, str]] = None,
) -> tuple[int, int]:
"""从环境变量读取 (alert_threshold, hard_limit)。"""
src = env if env is not None else os.environ
alert = parse_daily_open_alert_threshold(src.get("DAILY_OPEN_ALERT_THRESHOLD"))
hard = parse_daily_open_hard_limit(src.get("DAILY_OPEN_HARD_LIMIT"))
return alert, hard
def count_opens_for_trading_day(conn, trading_day: str) -> int:
"""本交易日已成功写入 order_monitors 的开仓次数。"""
td = (trading_day or "").strip()
if not td:
return 0
row = conn.execute(
"SELECT COUNT(*) FROM order_monitors WHERE session_date=?",
(td,),
).fetchone()
return int(row[0] if row else 0)
def daily_open_hard_limit_blocks(opens_today: int, hard_limit: int) -> bool:
return int(hard_limit) > 0 and int(opens_today) >= int(hard_limit)
def hard_limit_block_reason(opens_today: int, hard_limit: int, reset_hour: int) -> str:
return (
f"本交易日开仓次数已达上限({int(opens_today)}/{int(hard_limit)}),"
f"次日北京时间 {int(reset_hour)}:00 后恢复"
)
def check_daily_open_hard_limit(
conn,
trading_day: str,
hard_limit: int,
reset_hour: int,
) -> tuple[bool, str, int]:
"""返回 (允许继续开仓, 拒绝原因, 当日已开次数)。"""
opens_today = count_opens_for_trading_day(conn, trading_day)
if daily_open_hard_limit_blocks(opens_today, hard_limit):
return False, hard_limit_block_reason(opens_today, hard_limit, reset_hour), opens_today
return True, "", opens_today
def can_trade_new_open(
*,
time_allows: bool,
active_count: int,
max_active_positions: int,
opens_today: int,
hard_limit: int,
extra_blocks: bool = False,
) -> bool:
if extra_blocks:
return False
if not time_allows:
return False
if int(active_count) >= int(max_active_positions):
return False
if daily_open_hard_limit_blocks(opens_today, hard_limit):
return False
return True
def should_send_daily_open_alert(before: int, after: int, alert_threshold: int) -> bool:
return int(before) < int(alert_threshold) <= int(after)
def build_daily_open_alert_prompt(
trading_day: str,
opens_after: int,
alert_threshold: int,
*,
hard_limit: int = 0,
detail_line: str = "",
) -> str:
hard_txt = (
f"硬上限 {hard_limit} 次(已达后将禁止新开仓直至下一交易日)。"
if int(hard_limit) > 0
else "未配置单日硬上限。"
)
extra = f" {detail_line}" if detail_line else ""
return (
f"用户在北京时间交易日 {trading_day} 已累计开仓 {opens_after}"
f"AI 提醒阈值 {alert_threshold}{hard_txt}"
f"{extra}"
f"用户自述“上头了”。请给克制提醒。"
)
def format_daily_open_counter_line(
opens_today: int,
alert_threshold: int,
hard_limit: int,
) -> str:
if int(hard_limit) > 0:
return (
f"📅 当日开仓次数:{int(opens_today)} / 硬上限 {int(hard_limit)}"
f"AI 提醒阈值 {int(alert_threshold)}"
)
return (
f"📅 当日开仓次数:{int(opens_today)} / AI 提醒阈值 {int(alert_threshold)}"
)
def format_daily_open_summary_short(
opens_today: int,
alert_threshold: int,
hard_limit: int,
) -> str:
if int(hard_limit) > 0:
return f"本交易日累计开仓:{int(opens_today)}(硬上限 {int(hard_limit)},提醒 {int(alert_threshold)}"
return f"本交易日累计开仓:{int(opens_today)}(提醒阈值 {int(alert_threshold)}"
+136 -136
View File
@@ -1,136 +1,136 @@
"""
所共用:计仓模式 risk(以损定仓)| full_margin(全仓杠杆)。
仅 env POSITION_SIZING_MODE 切换;须无持仓(由部署流程保证)。
"""
from __future__ import annotations
import os
from typing import Any, Optional, Tuple
MODE_RISK = "risk"
MODE_FULL_MARGIN = "full_margin"
VALID_MODES = frozenset({MODE_RISK, MODE_FULL_MARGIN})
OPEN_SOURCE_MANUAL = "manual"
OPEN_SOURCE_KEY_AUTO = "key_auto"
OPEN_SOURCE_KEY_FIB = "key_fib"
OPEN_SOURCE_KEY_TRIGGER = "key_trigger"
OPEN_SOURCE_TREND = "trend"
OPEN_SOURCE_ROLL = "roll"
FULL_MARGIN_BLOCKED_SOURCES = frozenset(
{OPEN_SOURCE_KEY_AUTO, OPEN_SOURCE_KEY_FIB, OPEN_SOURCE_TREND, OPEN_SOURCE_ROLL}
)
def normalize_position_sizing_mode(raw: Optional[str]) -> str:
v = (raw or MODE_RISK).strip().lower()
if v in ("full", "full_margin", "fullmargin", "全仓", "全仓杠杆"):
return MODE_FULL_MARGIN
return MODE_RISK if v in ("risk", "r", "以损定仓", "") else MODE_RISK
def load_position_sizing_mode(env: Optional[dict] = None) -> str:
e = env if env is not None else os.environ
return normalize_position_sizing_mode(e.get("POSITION_SIZING_MODE"))
def is_full_margin_mode(mode: str) -> bool:
return normalize_position_sizing_mode(mode) == MODE_FULL_MARGIN
def mode_label_zh(mode: str) -> str:
return "全仓杠杆" if is_full_margin_mode(mode) else "以损定仓"
def leverage_for_full_margin(symbol: str, btc_leverage: int, alt_leverage: int) -> int:
sym = (symbol or "").strip().upper()
if sym.startswith("BTC") or sym.startswith("ETH"):
return max(1, int(btc_leverage or 10))
return max(1, int(alt_leverage or 5))
def round_funds(value: float, decimals: int = 2) -> float:
return round(float(value), int(decimals))
def risk_percent_for_storage(mode: str, risk_percent: float) -> Optional[float]:
"""全仓杠杆:库内不写风险百分比(仅 risk_amount U)。"""
if is_full_margin_mode(mode):
return None
return risk_percent
def format_risk_display_text(
mode: str,
risk_percent: Optional[float],
risk_amount: Optional[float],
*,
decimals: int = 2,
) -> str:
"""持仓/通知「风险」文案:全仓仅 U;以损定仓为 %≈U。"""
amt: Optional[float] = None
if risk_amount is not None and risk_amount != "":
try:
amt = float(risk_amount)
except (TypeError, ValueError):
amt = None
if is_full_margin_mode(mode):
if amt is None:
return ""
return f"{round_funds(amt, decimals)}U"
pct: Optional[float] = None
if risk_percent is not None and risk_percent != "":
try:
pct = float(risk_percent)
except (TypeError, ValueError):
pct = None
pct_txt = f"{pct:g}" if pct is not None else ""
amt_txt = round_funds(amt, decimals) if amt is not None else ""
return f"{pct_txt}%≈{amt_txt}U"
def assert_open_source_allowed(mode: str, source: str) -> Tuple[bool, str]:
if not is_full_margin_mode(mode):
return True, ""
src = (source or "").strip().lower()
if src in FULL_MARGIN_BLOCKED_SOURCES:
return False, (
"当前为全仓杠杆模式(POSITION_SIZING_MODE=full_margin),"
"不允许关键位突破/斐波自动开仓、趋势回调与顺势加仓;"
"仅支持实盘人工下单与阻力/支撑提醒。"
)
return True, ""
def full_margin_requires_flat_position(active_count: int) -> Tuple[bool, str]:
if active_count > 0:
return False, "全仓杠杆模式仅允许单仓且无其它持仓,请先平仓后再开仓"
return True, ""
def compute_full_margin_sizing(
*,
symbol: str,
available_usdt: float,
capital_base: float,
buffer_ratio: float,
btc_leverage: int,
alt_leverage: int,
funds_decimals: int = 2,
) -> Tuple[Optional[dict[str, Any]], Optional[str]]:
if available_usdt is None or float(available_usdt) <= 0:
return None, "全仓杠杆:无法读取合约账户可用保证金"
lev = leverage_for_full_margin(symbol, btc_leverage, alt_leverage)
margin = round_funds(float(available_usdt) * float(buffer_ratio), funds_decimals)
if margin <= 0:
return None, "全仓杠杆:可用保证金不足"
notional = round_funds(margin * lev, funds_decimals)
ratio = round(margin / float(capital_base) * 100, 2) if capital_base else 0.0
return {
"margin_capital": margin,
"leverage": lev,
"notional_value": notional,
"position_ratio": ratio,
"mode": MODE_FULL_MARGIN,
}, None
"""
所共用:计仓模式 risk(以损定仓)| full_margin(全仓杠杆)。
仅 env POSITION_SIZING_MODE 切换;须无持仓(由部署流程保证)。
"""
from __future__ import annotations
import os
from typing import Any, Optional, Tuple
MODE_RISK = "risk"
MODE_FULL_MARGIN = "full_margin"
VALID_MODES = frozenset({MODE_RISK, MODE_FULL_MARGIN})
OPEN_SOURCE_MANUAL = "manual"
OPEN_SOURCE_KEY_AUTO = "key_auto"
OPEN_SOURCE_KEY_FIB = "key_fib"
OPEN_SOURCE_KEY_TRIGGER = "key_trigger"
OPEN_SOURCE_TREND = "trend"
OPEN_SOURCE_ROLL = "roll"
FULL_MARGIN_BLOCKED_SOURCES = frozenset(
{OPEN_SOURCE_KEY_AUTO, OPEN_SOURCE_KEY_FIB, OPEN_SOURCE_TREND, OPEN_SOURCE_ROLL}
)
def normalize_position_sizing_mode(raw: Optional[str]) -> str:
v = (raw or MODE_RISK).strip().lower()
if v in ("full", "full_margin", "fullmargin", "全仓", "全仓杠杆"):
return MODE_FULL_MARGIN
return MODE_RISK if v in ("risk", "r", "以损定仓", "") else MODE_RISK
def load_position_sizing_mode(env: Optional[dict] = None) -> str:
e = env if env is not None else os.environ
return normalize_position_sizing_mode(e.get("POSITION_SIZING_MODE"))
def is_full_margin_mode(mode: str) -> bool:
return normalize_position_sizing_mode(mode) == MODE_FULL_MARGIN
def mode_label_zh(mode: str) -> str:
return "全仓杠杆" if is_full_margin_mode(mode) else "以损定仓"
def leverage_for_full_margin(symbol: str, btc_leverage: int, alt_leverage: int) -> int:
sym = (symbol or "").strip().upper()
if sym.startswith("BTC") or sym.startswith("ETH"):
return max(1, int(btc_leverage or 10))
return max(1, int(alt_leverage or 5))
def round_funds(value: float, decimals: int = 2) -> float:
return round(float(value), int(decimals))
def risk_percent_for_storage(mode: str, risk_percent: float) -> Optional[float]:
"""全仓杠杆:库内不写风险百分比(仅 risk_amount U)。"""
if is_full_margin_mode(mode):
return None
return risk_percent
def format_risk_display_text(
mode: str,
risk_percent: Optional[float],
risk_amount: Optional[float],
*,
decimals: int = 2,
) -> str:
"""持仓/通知「风险」文案:全仓仅 U;以损定仓为 %≈U。"""
amt: Optional[float] = None
if risk_amount is not None and risk_amount != "":
try:
amt = float(risk_amount)
except (TypeError, ValueError):
amt = None
if is_full_margin_mode(mode):
if amt is None:
return ""
return f"{round_funds(amt, decimals)}U"
pct: Optional[float] = None
if risk_percent is not None and risk_percent != "":
try:
pct = float(risk_percent)
except (TypeError, ValueError):
pct = None
pct_txt = f"{pct:g}" if pct is not None else ""
amt_txt = round_funds(amt, decimals) if amt is not None else ""
return f"{pct_txt}%≈{amt_txt}U"
def assert_open_source_allowed(mode: str, source: str) -> Tuple[bool, str]:
if not is_full_margin_mode(mode):
return True, ""
src = (source or "").strip().lower()
if src in FULL_MARGIN_BLOCKED_SOURCES:
return False, (
"当前为全仓杠杆模式(POSITION_SIZING_MODE=full_margin),"
"不允许关键位突破/斐波自动开仓、趋势回调与顺势加仓;"
"仅支持实盘人工下单与阻力/支撑提醒。"
)
return True, ""
def full_margin_requires_flat_position(active_count: int) -> Tuple[bool, str]:
if active_count > 0:
return False, "全仓杠杆模式仅允许单仓且无其它持仓,请先平仓后再开仓"
return True, ""
def compute_full_margin_sizing(
*,
symbol: str,
available_usdt: float,
capital_base: float,
buffer_ratio: float,
btc_leverage: int,
alt_leverage: int,
funds_decimals: int = 2,
) -> Tuple[Optional[dict[str, Any]], Optional[str]]:
if available_usdt is None or float(available_usdt) <= 0:
return None, "全仓杠杆:无法读取合约账户可用保证金"
lev = leverage_for_full_margin(symbol, btc_leverage, alt_leverage)
margin = round_funds(float(available_usdt) * float(buffer_ratio), funds_decimals)
if margin <= 0:
return None, "全仓杠杆:可用保证金不足"
notional = round_funds(margin * lev, funds_decimals)
ratio = round(margin / float(capital_base) * 100, 2) if capital_base else 0.0
return {
"margin_capital": margin,
"leverage": lev,
"notional_value": notional,
"position_ratio": ratio,
"mode": MODE_FULL_MARGIN,
}, None
+229 -229
View File
@@ -1,229 +1,229 @@
"""平仓交易:交易所口径双边成交额与手续费(所共用聚合逻辑)。"""
from __future__ import annotations
from typing import Any, Callable, Optional
def _coerce_ts_ms(raw: Any) -> int | None:
if raw in (None, ""):
return None
try:
v = int(raw)
return v if v > 1_000_000_000_000 else v * 1000
except (TypeError, ValueError):
return None
def quote_turnover_usdt_from_fill(trade: dict, *, contract_size: float = 1.0) -> float:
"""单笔成交的报价币成交额(USDT 口径)。"""
info = trade.get("info") or {}
if not isinstance(info, dict):
info = {}
for key in ("quoteQty", "quote_qty", "fillNotionalUsd", "notional"):
try:
v = float(info.get(key) or 0)
if v > 0:
return abs(v)
except (TypeError, ValueError):
continue
try:
cost = float(trade.get("cost") or 0)
if cost > 0:
return abs(cost)
except (TypeError, ValueError):
pass
try:
price = float(trade.get("price") or 0)
amount = float(trade.get("amount") or 0) * float(contract_size or 1.0)
if price > 0 and amount > 0:
return abs(price * amount)
except (TypeError, ValueError):
pass
return 0.0
def commission_usdt_from_fill(trade: dict) -> float:
"""单笔成交手续费(正数表示成本)。"""
fee = trade.get("fee")
if isinstance(fee, dict):
try:
cost = float(fee.get("cost") or 0)
except (TypeError, ValueError):
cost = 0.0
if cost != 0:
cur = str(fee.get("currency") or "USDT").upper()
if cur in ("USDT", "USD", "BUSD", "USDC"):
return abs(cost)
return abs(cost)
info = trade.get("info") or {}
if isinstance(info, dict):
for key in ("fee", "commission", "fillFee"):
try:
v = float(info.get(key) or 0)
if v != 0:
return abs(v)
except (TypeError, ValueError):
continue
return 0.0
def aggregate_bilateral_stats(
fills: list[dict],
*,
contract_size: float = 1.0,
) -> dict[str, float] | None:
"""双边成交额 = 开+平所有相关 fill 的报价币成交额之和;手续费 = fill fee 之和。"""
if not fills:
return None
turnover = 0.0
commission = 0.0
for t in fills:
turnover += quote_turnover_usdt_from_fill(t, contract_size=contract_size)
commission += commission_usdt_from_fill(t)
if turnover <= 0 and commission <= 0:
return None
return {
"exchange_turnover_usdt": round(turnover, 4),
"exchange_commission_usdt": round(commission, 4),
}
def filter_position_lifecycle_fills(
trades: list[dict],
direction: str,
open_ms: int | None,
close_ms: int | None,
*,
hedge_mode: bool = False,
close_buffer_ms: int = 15 * 60 * 1000,
) -> list[dict]:
"""
持仓生命周期内 fill:多=开买+平卖;空=开卖+平买。
hedge_mode 时按 posSide 与 direction 过滤。
"""
direction = (direction or "long").strip().lower()
open_side = "buy" if direction == "long" else "sell"
close_side = "sell" if direction == "long" else "buy"
allowed_sides = {open_side, close_side}
upper = int(close_ms) + int(close_buffer_ms) if close_ms else None
out: list[dict] = []
for t in trades or []:
side = (t.get("side") or "").lower()
if side not in allowed_sides:
continue
ts = _coerce_ts_ms(t.get("timestamp"))
if ts is None:
continue
if open_ms and ts < int(open_ms) - 60_000:
continue
if upper and ts > upper:
continue
if hedge_mode:
info = t.get("info") or {}
if not isinstance(info, dict):
info = {}
pos_side = (info.get("posSide") or t.get("posSide") or "").lower()
if pos_side in ("long", "short") and pos_side != direction:
continue
out.append(t)
out.sort(key=lambda x: x.get("timestamp") or 0)
return out
def sum_binance_commission_income(entries: list[dict], trade_ids: set[str] | None) -> float | None:
"""Binance income 流水中 COMMISSION 合计(负值取绝对值为成本)。"""
if not entries:
return None
total = 0.0
found = False
for e in entries:
it = (e.get("incomeType") or e.get("income_type") or "").strip()
if it != "COMMISSION":
continue
if trade_ids:
tid = str(e.get("tradeId") or e.get("trade_id") or "").strip()
if tid and tid not in trade_ids:
continue
try:
total += float(e.get("income") or 0)
found = True
except (TypeError, ValueError):
continue
if not found:
return None
return round(abs(total), 4)
def trade_ids_from_fills(fills: list[dict]) -> set[str]:
out: set[str] = set()
for t in fills or []:
info = t.get("info") or {}
if not isinstance(info, dict):
info = {}
for key in ("id", "tradeId", "trade_id"):
raw = t.get(key) if key in t else info.get(key)
if raw is not None and str(raw).strip():
out.add(str(raw).strip())
break
return out
def merge_commission_prefer_income(
fill_commission: float,
income_commission: float | None,
) -> float:
if income_commission is not None and income_commission > 0:
return round(income_commission, 4)
return round(max(fill_commission, 0.0), 4)
def update_trade_record_stats_columns(
conn: Any,
trade_id: int,
turnover_usdt: float | None,
commission_usdt: float | None,
) -> None:
if turnover_usdt is None and commission_usdt is None:
return
conn.execute(
"""
UPDATE trade_records
SET exchange_turnover_usdt = COALESCE(?, exchange_turnover_usdt),
exchange_commission_usdt = COALESCE(?, exchange_commission_usdt)
WHERE id = ?
""",
(turnover_usdt, commission_usdt, int(trade_id)),
)
def attach_exchange_stats_to_trade(
conn: Any,
trade_id: int,
*,
fetch_fills: Callable[[], list[dict]],
contract_size: float = 1.0,
income_commission: float | None = None,
) -> dict[str, float] | None:
"""拉 fill 并写库;仅在新单平仓路径调用。"""
try:
fills = fetch_fills() or []
except Exception:
fills = []
stats = aggregate_bilateral_stats(fills, contract_size=contract_size)
if not stats and income_commission is None:
return None
turnover = stats.get("exchange_turnover_usdt") if stats else None
fill_comm = float(stats.get("exchange_commission_usdt") or 0) if stats else 0.0
commission = merge_commission_prefer_income(fill_comm, income_commission)
update_trade_record_stats_columns(
conn,
trade_id,
turnover,
commission if commission > 0 else None,
)
out = {}
if turnover is not None:
out["exchange_turnover_usdt"] = turnover
if commission > 0:
out["exchange_commission_usdt"] = commission
return out or None
"""平仓交易:交易所口径双边成交额与手续费(所共用聚合逻辑)。"""
from __future__ import annotations
from typing import Any, Callable, Optional
def _coerce_ts_ms(raw: Any) -> int | None:
if raw in (None, ""):
return None
try:
v = int(raw)
return v if v > 1_000_000_000_000 else v * 1000
except (TypeError, ValueError):
return None
def quote_turnover_usdt_from_fill(trade: dict, *, contract_size: float = 1.0) -> float:
"""单笔成交的报价币成交额(USDT 口径)。"""
info = trade.get("info") or {}
if not isinstance(info, dict):
info = {}
for key in ("quoteQty", "quote_qty", "fillNotionalUsd", "notional"):
try:
v = float(info.get(key) or 0)
if v > 0:
return abs(v)
except (TypeError, ValueError):
continue
try:
cost = float(trade.get("cost") or 0)
if cost > 0:
return abs(cost)
except (TypeError, ValueError):
pass
try:
price = float(trade.get("price") or 0)
amount = float(trade.get("amount") or 0) * float(contract_size or 1.0)
if price > 0 and amount > 0:
return abs(price * amount)
except (TypeError, ValueError):
pass
return 0.0
def commission_usdt_from_fill(trade: dict) -> float:
"""单笔成交手续费(正数表示成本)。"""
fee = trade.get("fee")
if isinstance(fee, dict):
try:
cost = float(fee.get("cost") or 0)
except (TypeError, ValueError):
cost = 0.0
if cost != 0:
cur = str(fee.get("currency") or "USDT").upper()
if cur in ("USDT", "USD", "BUSD", "USDC"):
return abs(cost)
return abs(cost)
info = trade.get("info") or {}
if isinstance(info, dict):
for key in ("fee", "commission", "fillFee"):
try:
v = float(info.get(key) or 0)
if v != 0:
return abs(v)
except (TypeError, ValueError):
continue
return 0.0
def aggregate_bilateral_stats(
fills: list[dict],
*,
contract_size: float = 1.0,
) -> dict[str, float] | None:
"""双边成交额 = 开+平所有相关 fill 的报价币成交额之和;手续费 = fill fee 之和。"""
if not fills:
return None
turnover = 0.0
commission = 0.0
for t in fills:
turnover += quote_turnover_usdt_from_fill(t, contract_size=contract_size)
commission += commission_usdt_from_fill(t)
if turnover <= 0 and commission <= 0:
return None
return {
"exchange_turnover_usdt": round(turnover, 4),
"exchange_commission_usdt": round(commission, 4),
}
def filter_position_lifecycle_fills(
trades: list[dict],
direction: str,
open_ms: int | None,
close_ms: int | None,
*,
hedge_mode: bool = False,
close_buffer_ms: int = 15 * 60 * 1000,
) -> list[dict]:
"""
持仓生命周期内 fill:多=开买+平卖;空=开卖+平买。
hedge_mode 时按 posSide 与 direction 过滤。
"""
direction = (direction or "long").strip().lower()
open_side = "buy" if direction == "long" else "sell"
close_side = "sell" if direction == "long" else "buy"
allowed_sides = {open_side, close_side}
upper = int(close_ms) + int(close_buffer_ms) if close_ms else None
out: list[dict] = []
for t in trades or []:
side = (t.get("side") or "").lower()
if side not in allowed_sides:
continue
ts = _coerce_ts_ms(t.get("timestamp"))
if ts is None:
continue
if open_ms and ts < int(open_ms) - 60_000:
continue
if upper and ts > upper:
continue
if hedge_mode:
info = t.get("info") or {}
if not isinstance(info, dict):
info = {}
pos_side = (info.get("posSide") or t.get("posSide") or "").lower()
if pos_side in ("long", "short") and pos_side != direction:
continue
out.append(t)
out.sort(key=lambda x: x.get("timestamp") or 0)
return out
def sum_binance_commission_income(entries: list[dict], trade_ids: set[str] | None) -> float | None:
"""Binance income 流水中 COMMISSION 合计(负值取绝对值为成本)。"""
if not entries:
return None
total = 0.0
found = False
for e in entries:
it = (e.get("incomeType") or e.get("income_type") or "").strip()
if it != "COMMISSION":
continue
if trade_ids:
tid = str(e.get("tradeId") or e.get("trade_id") or "").strip()
if tid and tid not in trade_ids:
continue
try:
total += float(e.get("income") or 0)
found = True
except (TypeError, ValueError):
continue
if not found:
return None
return round(abs(total), 4)
def trade_ids_from_fills(fills: list[dict]) -> set[str]:
out: set[str] = set()
for t in fills or []:
info = t.get("info") or {}
if not isinstance(info, dict):
info = {}
for key in ("id", "tradeId", "trade_id"):
raw = t.get(key) if key in t else info.get(key)
if raw is not None and str(raw).strip():
out.add(str(raw).strip())
break
return out
def merge_commission_prefer_income(
fill_commission: float,
income_commission: float | None,
) -> float:
if income_commission is not None and income_commission > 0:
return round(income_commission, 4)
return round(max(fill_commission, 0.0), 4)
def update_trade_record_stats_columns(
conn: Any,
trade_id: int,
turnover_usdt: float | None,
commission_usdt: float | None,
) -> None:
if turnover_usdt is None and commission_usdt is None:
return
conn.execute(
"""
UPDATE trade_records
SET exchange_turnover_usdt = COALESCE(?, exchange_turnover_usdt),
exchange_commission_usdt = COALESCE(?, exchange_commission_usdt)
WHERE id = ?
""",
(turnover_usdt, commission_usdt, int(trade_id)),
)
def attach_exchange_stats_to_trade(
conn: Any,
trade_id: int,
*,
fetch_fills: Callable[[], list[dict]],
contract_size: float = 1.0,
income_commission: float | None = None,
) -> dict[str, float] | None:
"""拉 fill 并写库;仅在新单平仓路径调用。"""
try:
fills = fetch_fills() or []
except Exception:
fills = []
stats = aggregate_bilateral_stats(fills, contract_size=contract_size)
if not stats and income_commission is None:
return None
turnover = stats.get("exchange_turnover_usdt") if stats else None
fill_comm = float(stats.get("exchange_commission_usdt") or 0) if stats else 0.0
commission = merge_commission_prefer_income(fill_comm, income_commission)
update_trade_record_stats_columns(
conn,
trade_id,
turnover,
commission if commission > 0 else None,
)
out = {}
if turnover is not None:
out["exchange_turnover_usdt"] = turnover
if commission > 0:
out["exchange_commission_usdt"] = commission
return out or None