a35a08d2f6
Co-authored-by: Cursor <cursoragent@cursor.com>
258 lines
7.7 KiB
Python
258 lines
7.7 KiB
Python
"""复盘 K 线:新浪拉取 + matplotlib 生成截图。"""
|
|
import json
|
|
import logging
|
|
import os
|
|
import re
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
from zoneinfo import ZoneInfo
|
|
|
|
import requests
|
|
|
|
from symbols import ths_to_codes
|
|
|
|
logger = logging.getLogger(__name__)
|
|
TZ = ZoneInfo("Asia/Shanghai")
|
|
|
|
PERIOD_MINUTES = {
|
|
"1m": "1",
|
|
"3m": "3",
|
|
"5m": "5",
|
|
"15m": "15",
|
|
"30m": "30",
|
|
"1h": "60",
|
|
"4h": "240",
|
|
}
|
|
|
|
|
|
def ths_to_sina_chart_symbol(symbol: str) -> Optional[str]:
|
|
"""ag2608 -> AG2608(新浪 K 线接口合约代码)。"""
|
|
code = (symbol or "").strip()
|
|
if not code:
|
|
return None
|
|
codes = ths_to_codes(code)
|
|
if codes:
|
|
sina = codes.get("sina_code", "")
|
|
if sina.startswith("nf_"):
|
|
return sina[3:]
|
|
if sina.startswith("CFF_RE_"):
|
|
return sina[7:]
|
|
ths = codes.get("ths_code", "")
|
|
return ths.upper() if ths else None
|
|
m = re.match(r"^([A-Za-z]+)(\d+)$", code)
|
|
if m:
|
|
return m.group(1).upper() + m.group(2)
|
|
return None
|
|
|
|
|
|
def _parse_jsonp(text: str) -> Optional[list]:
|
|
m = re.search(r"\((.*)\)\s*;?\s*$", text.strip(), re.DOTALL)
|
|
if not m:
|
|
return None
|
|
try:
|
|
data = json.loads(m.group(1))
|
|
return data if isinstance(data, list) else None
|
|
except json.JSONDecodeError:
|
|
return None
|
|
|
|
|
|
def fetch_sina_klines(symbol: str, period: str) -> list:
|
|
"""拉取新浪期货分钟 K 线。"""
|
|
chart_sym = ths_to_sina_chart_symbol(symbol)
|
|
if not chart_sym:
|
|
return []
|
|
if period == "1d":
|
|
return _fetch_sina_daily(chart_sym)
|
|
typ = PERIOD_MINUTES.get(period)
|
|
if not typ:
|
|
return []
|
|
ts = datetime.now(TZ).strftime("%Y%m%d%H%M%S")
|
|
url = (
|
|
"https://stock2.finance.sina.com.cn/futures/api/jsonp.php/"
|
|
f"var_{chart_sym}_{typ}_{ts}=/InnerFuturesNewService.getFewMinLine"
|
|
f"?symbol={chart_sym}&type={typ}"
|
|
)
|
|
try:
|
|
resp = requests.get(
|
|
url,
|
|
timeout=20,
|
|
headers={"Referer": "https://finance.sina.com.cn"},
|
|
)
|
|
bars = _parse_jsonp(resp.text)
|
|
return bars or []
|
|
except Exception as exc:
|
|
logger.warning("fetch kline failed %s %s: %s", chart_sym, period, exc)
|
|
return []
|
|
|
|
|
|
def _fetch_sina_daily(chart_sym: str) -> list:
|
|
url = (
|
|
"https://stock2.finance.sina.com.cn/futures/api/json.php/"
|
|
f"IndexService.getInnerFuturesDailyKLine?symbol={chart_sym}"
|
|
)
|
|
try:
|
|
resp = requests.get(url, timeout=20, headers={"Referer": "https://finance.sina.com.cn"})
|
|
raw = resp.json()
|
|
if not raw:
|
|
return []
|
|
out = []
|
|
for row in raw:
|
|
if isinstance(row, list) and len(row) >= 5:
|
|
out.append({
|
|
"d": row[0],
|
|
"o": row[1],
|
|
"h": row[2],
|
|
"l": row[3],
|
|
"c": row[4],
|
|
})
|
|
return out
|
|
except Exception as exc:
|
|
logger.warning("fetch daily kline failed %s: %s", chart_sym, exc)
|
|
return []
|
|
|
|
|
|
def _parse_dt(value: str) -> Optional[datetime]:
|
|
if not value:
|
|
return None
|
|
v = value.strip().replace("T", " ")
|
|
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M"):
|
|
try:
|
|
return datetime.strptime(v, fmt).replace(tzinfo=TZ)
|
|
except ValueError:
|
|
continue
|
|
try:
|
|
return datetime.fromisoformat(value.strip()).replace(tzinfo=TZ)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _bar_datetime(bar: dict) -> Optional[datetime]:
|
|
d = bar.get("d")
|
|
if not d:
|
|
return None
|
|
try:
|
|
return datetime.strptime(d, "%Y-%m-%d %H:%M:%S").replace(tzinfo=TZ)
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def _select_bars(
|
|
bars: list,
|
|
cutoff: datetime,
|
|
count: int,
|
|
) -> list:
|
|
filtered = []
|
|
for bar in bars:
|
|
dt = _bar_datetime(bar)
|
|
if dt and dt <= cutoff:
|
|
filtered.append(bar)
|
|
if not filtered:
|
|
filtered = bars
|
|
if count > 0 and len(filtered) > count:
|
|
filtered = filtered[-count:]
|
|
return filtered
|
|
|
|
|
|
def generate_review_kline_chart(
|
|
symbol: str,
|
|
periods: list[str],
|
|
count: int,
|
|
cutoff_label: str,
|
|
open_time: str,
|
|
close_time: str,
|
|
entry_price: Optional[float],
|
|
stop_loss: Optional[float],
|
|
take_profit: Optional[float],
|
|
close_price: Optional[float],
|
|
upload_dir: str,
|
|
) -> Optional[str]:
|
|
"""生成双周期 K 线复盘图,返回 uploads 目录下的文件名。"""
|
|
import matplotlib
|
|
matplotlib.use("Agg")
|
|
import matplotlib.pyplot as plt
|
|
import matplotlib.dates as mdates
|
|
|
|
now = datetime.now(TZ)
|
|
if cutoff_label == "开仓时间":
|
|
cutoff = _parse_dt(open_time) or now
|
|
elif cutoff_label == "当前时间":
|
|
cutoff = now
|
|
else:
|
|
cutoff = _parse_dt(close_time) or now
|
|
|
|
open_dt = _parse_dt(open_time)
|
|
close_dt = _parse_dt(close_time)
|
|
|
|
valid_periods = [p for p in periods if p]
|
|
if not valid_periods:
|
|
valid_periods = ["15m", "1h"]
|
|
|
|
fig, axes = plt.subplots(
|
|
len(valid_periods), 1,
|
|
figsize=(14, 4.5 * len(valid_periods)),
|
|
facecolor="#0a0a10",
|
|
squeeze=False,
|
|
)
|
|
|
|
plotted = False
|
|
for idx, period in enumerate(valid_periods):
|
|
ax = axes[idx, 0]
|
|
bars = fetch_sina_klines(symbol, period)
|
|
bars = _select_bars(bars, cutoff, count)
|
|
if not bars:
|
|
ax.set_facecolor("#12121a")
|
|
ax.text(0.5, 0.5, f"No {period} data", ha="center", va="center", color="#888")
|
|
ax.set_xticks([])
|
|
ax.set_yticks([])
|
|
continue
|
|
|
|
times = [_bar_datetime(b) for b in bars]
|
|
closes = [float(b["c"]) for b in bars]
|
|
highs = [float(b["h"]) for b in bars]
|
|
lows = [float(b["l"]) for b in bars]
|
|
|
|
ax.set_facecolor("#12121a")
|
|
ax.plot(times, closes, color="#4cc2ff", linewidth=1.2)
|
|
ax.fill_between(
|
|
times, lows, highs,
|
|
color="#4cc2ff", alpha=0.12,
|
|
)
|
|
|
|
levels = [
|
|
(entry_price, "#eac147", "Entry"),
|
|
(stop_loss, "#ff6666", "SL"),
|
|
(take_profit, "#4cd97f", "TP"),
|
|
(close_price, "#c4c4ff", "Close"),
|
|
]
|
|
for price, color, label in levels:
|
|
if price is not None:
|
|
ax.axhline(price, color=color, linewidth=0.9, linestyle="--", alpha=0.85)
|
|
ax.text(times[-1], price, label, color=color, fontsize=8, va="bottom")
|
|
|
|
if open_dt:
|
|
ax.axvline(open_dt, color="#888", linewidth=0.8, linestyle=":", alpha=0.7)
|
|
if close_dt:
|
|
ax.axvline(close_dt, color="#aaa", linewidth=0.8, linestyle=":", alpha=0.7)
|
|
|
|
chart_sym = ths_to_sina_chart_symbol(symbol) or symbol
|
|
ax.set_title(f"{chart_sym} {period}", color="#eaeaea", fontsize=11, pad=8)
|
|
ax.tick_params(colors="#888", labelsize=8)
|
|
for spine in ax.spines.values():
|
|
spine.set_color("#2e2e45")
|
|
ax.xaxis.set_major_formatter(mdates.DateFormatter("%m-%d %H:%M"))
|
|
ax.grid(True, color="#1e1e30", linewidth=0.5)
|
|
plotted = True
|
|
|
|
if not plotted:
|
|
plt.close(fig)
|
|
return None
|
|
|
|
fig.tight_layout()
|
|
ts = datetime.now(TZ).strftime("%Y%m%d%H%M%S")
|
|
chart_sym = ths_to_sina_chart_symbol(symbol) or "chart"
|
|
filename = f"{ts}_kline_{chart_sym}.png"
|
|
path = os.path.join(upload_dir, filename)
|
|
fig.savefig(path, dpi=120, facecolor=fig.get_facecolor())
|
|
plt.close(fig)
|
|
return filename
|