Files
qihuo/fee_specs.py
T
dekun 52aca456e9 Add PostgreSQL production backend to eliminate SQLite lock contention.
Support DATABASE_URL with connection pooling, pg_dump backups, SQLite migration script, and deploy_postgres.sh with docs.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-07-01 08:11:42 +08:00

386 lines
12 KiB
Python

# Copyright (c) 2025-2026 马建军. All rights reserved.
# 专有软件 — 未经授权禁止复制、传播、转售。
# 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。
# 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md
"""期货手续费:仅 CTP 柜台同步入库,前端只读展示。"""
import json
import os
import re
from datetime import datetime
from typing import Optional
from contract_specs import get_contract_spec
from db_conn import connect_db, is_benign_migration_error
DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "futures.db")
DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data")
DEFAULT_JSON = os.path.join(DATA_DIR, "fee_rates.json")
# 无配置时的兜底(已为交易所标准约 2 倍)
DEFAULT_FEE = {
"open_fixed": 2.0,
"open_ratio": 0.0,
"close_yesterday_fixed": 2.0,
"close_yesterday_ratio": 0.0,
"close_today_fixed": 4.0,
"close_today_ratio": 0.0,
}
_INDEX_PRODUCTS = {"if", "ih", "ic", "im"}
def product_from_code(ths_code: str) -> str:
code = (ths_code or "").strip()
m = re.match(r"^([A-Za-z]+)", code)
return m.group(1).lower() if m else ""
def _get_db():
return connect_db()
def ensure_fee_rates_schema(conn=None) -> None:
"""补齐 fee_rates 表结构(旧库可能缺少 source 列)。"""
close = False
if conn is None:
conn = _get_db()
close = True
try:
for sql in (
"ALTER TABLE fee_rates ADD COLUMN source TEXT DEFAULT 'local'",
):
try:
conn.execute(sql)
except Exception as exc:
if not is_benign_migration_error(exc):
raise
conn.commit()
finally:
if close:
conn.close()
def get_setting(key: str, default: str = "") -> str:
conn = _get_db()
row = conn.execute("SELECT value FROM settings WHERE key=?", (key,)).fetchone()
conn.close()
if not row:
return default
return (row["value"] or default) if row["value"] is not None else default
def set_setting(key: str, value: str) -> None:
conn = _get_db()
conn.execute(
"""INSERT INTO settings (key, value) VALUES (?,?)
ON CONFLICT(key) DO UPDATE SET value=excluded.value""",
(key, value),
)
conn.commit()
conn.close()
def get_fee_multiplier() -> float:
conn = _get_db()
row = conn.execute(
"SELECT value FROM settings WHERE key='fee_multiplier'"
).fetchone()
conn.close()
if row and row["value"]:
try:
return max(0.0, float(row["value"]))
except ValueError:
pass
return 2.0
def get_fee_source_mode() -> str:
"""固定 CTP 柜台。"""
return "ctp"
def purge_non_ctp_fee_rates() -> int:
"""删除非 CTP 来源的费率缓存。"""
conn = _get_db()
cur = conn.execute(
"DELETE FROM fee_rates WHERE COALESCE(source, '') != 'ctp'"
)
n = cur.rowcount
conn.commit()
conn.close()
return n
def _row_to_spec(row, mult: int) -> dict:
return {
"product": row["product"],
"exchange": row["exchange"] or "",
"mult": int(row["mult"] or mult),
"open_fixed": float(row["open_fixed"] or 0),
"open_ratio": float(row["open_ratio"] or 0),
"close_yesterday_fixed": float(row["close_yesterday_fixed"] or 0),
"close_yesterday_ratio": float(row["close_yesterday_ratio"] or 0),
"close_today_fixed": float(row["close_today_fixed"] or 0),
"close_today_ratio": float(row["close_today_ratio"] or 0),
"source": row["source"] if "source" in row.keys() else "local",
}
def get_fee_spec(ths_code: str, *, trading_mode: str = "simulation") -> dict:
product = product_from_code(ths_code)
if not product:
spec = get_contract_spec(ths_code)
return {**DEFAULT_FEE, "mult": spec["mult"], "product": "", "exchange": "", "source": "default"}
mult = get_contract_spec(ths_code)["mult"]
conn = _get_db()
ensure_fee_rates_schema(conn)
row = conn.execute(
"SELECT * FROM fee_rates WHERE product=? AND source='ctp'",
(product,),
).fetchone()
conn.close()
if row:
return _row_to_spec(row, mult)
try:
from ctp_fee_sync import sync_fee_for_symbol
fields = sync_fee_for_symbol(trading_mode, ths_code)
if fields:
return {"product": product, **fields}
except Exception:
pass
if product in _INDEX_PRODUCTS:
return {
"product": product,
"exchange": "CFFEX",
"mult": mult,
"open_fixed": 0.0,
"open_ratio": 0.000092,
"close_yesterday_fixed": 0.0,
"close_yesterday_ratio": 0.000092,
"close_today_fixed": 0.0,
"close_today_ratio": 0.000276,
}
return {
"product": product,
"exchange": "",
"mult": mult,
**DEFAULT_FEE,
"source": "default",
}
def calc_side_fee(
price: float,
lots: float,
mult: int,
fixed: float,
ratio: float,
) -> float:
lots = lots or 1.0
fixed = fixed or 0.0
ratio = ratio or 0.0
return fixed * lots + ratio * price * mult * lots
def is_same_day(open_time: str, close_time: str) -> bool:
if not open_time or not close_time:
return True
o = open_time.strip().replace(" ", "T")[:10]
c = close_time.strip().replace(" ", "T")[:10]
return o == c
def calc_round_trip_fee(
ths_code: str,
entry_price: float,
close_price: float,
lots: float,
open_time: str = "",
close_time: str = "",
trading_mode: str = "simulation",
) -> float:
if not entry_price or not close_price:
return 0.0
spec = get_fee_spec(ths_code, trading_mode=trading_mode)
mult = spec["mult"]
lots = lots or 1.0
open_fee = calc_side_fee(
entry_price, lots, mult,
spec["open_fixed"], spec["open_ratio"],
)
if is_same_day(open_time, close_time):
close_fee = calc_side_fee(
close_price, lots, mult,
spec["close_today_fixed"], spec["close_today_ratio"],
)
else:
close_fee = calc_side_fee(
close_price, lots, mult,
spec["close_yesterday_fixed"], spec["close_yesterday_ratio"],
)
return round(open_fee + close_fee, 2)
def calc_fee_breakdown(
ths_code: str,
entry_price: float,
close_price: float,
lots: float,
open_time: str = "",
close_time: str = "",
trading_mode: str = "simulation",
) -> dict:
spec = get_fee_spec(ths_code, trading_mode=trading_mode)
mult = spec["mult"]
lots = lots or 1.0
open_fee = calc_side_fee(
entry_price, lots, mult, spec["open_fixed"], spec["open_ratio"],
)
same_day = is_same_day(open_time, close_time)
if same_day:
close_fee = calc_side_fee(
close_price, lots, mult,
spec["close_today_fixed"], spec["close_today_ratio"],
)
close_type = "平今"
else:
close_fee = calc_side_fee(
close_price, lots, mult,
spec["close_yesterday_fixed"], spec["close_yesterday_ratio"],
)
close_type = "平昨"
total = round(open_fee + close_fee, 2)
return {
"open_fee": round(open_fee, 2),
"close_fee": round(close_fee, 2),
"close_type": close_type,
"total_fee": total,
"same_day": same_day,
"fee_source": spec.get("source", "local"),
}
def load_fee_rates_from_json(path: Optional[str] = None) -> int:
path = path or DEFAULT_JSON
if not os.path.isfile(path):
return 0
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
conn = _get_db()
now = datetime.now().isoformat(timespec="seconds")
count = 0
for product, item in data.items():
if not isinstance(item, dict):
continue
conn.execute(
"""INSERT INTO fee_rates
(product, exchange, mult,
open_fixed, open_ratio,
close_yesterday_fixed, close_yesterday_ratio,
close_today_fixed, close_today_ratio, updated_at, source)
VALUES (?,?,?,?,?,?,?,?,?,?,?)
ON CONFLICT(product) DO UPDATE SET
exchange=excluded.exchange, mult=excluded.mult,
open_fixed=excluded.open_fixed, open_ratio=excluded.open_ratio,
close_yesterday_fixed=excluded.close_yesterday_fixed,
close_yesterday_ratio=excluded.close_yesterday_ratio,
close_today_fixed=excluded.close_today_fixed,
close_today_ratio=excluded.close_today_ratio,
updated_at=excluded.updated_at,
source=excluded.source""",
(
product.lower(),
item.get("exchange", ""),
int(item.get("mult") or get_contract_spec(product)["mult"]),
float(item.get("open_fixed") or 0),
float(item.get("open_ratio") or 0),
float(item.get("close_yesterday_fixed") or 0),
float(item.get("close_yesterday_ratio") or 0),
float(item.get("close_today_fixed") or 0),
float(item.get("close_today_ratio") or 0),
now,
item.get("source", "json"),
),
)
count += 1
conn.commit()
conn.close()
return count
def list_ctp_fee_rates() -> list:
"""手续费页:仅展示 CTP 同步结果。"""
conn = _get_db()
rows = conn.execute(
"SELECT * FROM fee_rates WHERE source='ctp' ORDER BY product"
).fetchall()
conn.close()
return [dict(r) for r in rows]
def list_all_fee_rates() -> list:
conn = _get_db()
rows = conn.execute(
"SELECT * FROM fee_rates ORDER BY product"
).fetchall()
conn.close()
return [dict(r) for r in rows]
def list_fee_rates_for_ui() -> list:
return list_ctp_fee_rates()
def count_fee_rates_by_source() -> dict[str, int]:
conn = _get_db()
n = conn.execute(
"SELECT COUNT(*) FROM fee_rates WHERE source='ctp'"
).fetchone()[0]
conn.close()
return {"ctp": int(n or 0)}
def upsert_fee_rate(product: str, fields: dict) -> None:
product = product.lower().strip()
conn = _get_db()
now = datetime.now().isoformat(timespec="seconds")
source = fields.get("source", "manual")
conn.execute(
"""INSERT INTO fee_rates
(product, exchange, mult,
open_fixed, open_ratio,
close_yesterday_fixed, close_yesterday_ratio,
close_today_fixed, close_today_ratio, updated_at, source)
VALUES (?,?,?,?,?,?,?,?,?,?,?)
ON CONFLICT(product) DO UPDATE SET
exchange=excluded.exchange, mult=excluded.mult,
open_fixed=excluded.open_fixed, open_ratio=excluded.open_ratio,
close_yesterday_fixed=excluded.close_yesterday_fixed,
close_yesterday_ratio=excluded.close_yesterday_ratio,
close_today_fixed=excluded.close_today_fixed,
close_today_ratio=excluded.close_today_ratio,
updated_at=excluded.updated_at,
source=excluded.source""",
(
product,
fields.get("exchange", ""),
int(fields.get("mult") or 10),
float(fields.get("open_fixed") or 0),
float(fields.get("open_ratio") or 0),
float(fields.get("close_yesterday_fixed") or 0),
float(fields.get("close_yesterday_ratio") or 0),
float(fields.get("close_today_fixed") or 0),
float(fields.get("close_today_ratio") or 0),
now,
source,
),
)
conn.commit()
conn.close()