Files
crypto_monitor/journal_chart_lib.py
2026-05-27 15:32:46 +08:00

453 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""交易复盘 / 订单 K 线拼图(Binance / Gate / OKX 共用)。"""
import math
try:
from PIL import Image, ImageDraw, ImageFont
except ImportError:
Image = None # type: ignore
ImageDraw = None # type: ignore
ImageFont = None # type: ignore
JOURNAL_CHART_TF_CHOICES = ("1m", "3m", "5m", "15m", "30m", "1h", "2h", "4h", "6h", "12h", "1d")
JOURNAL_CHART_DEFAULT_TF1 = "15m"
JOURNAL_CHART_DEFAULT_TF2 = "1h"
JOURNAL_CHART_DEFAULT_LIMIT = 300
JOURNAL_CHART_LIMIT_MIN = 50
JOURNAL_CHART_LIMIT_MAX = 500
JOURNAL_CHART_ANCHOR_CLOSE = "close"
JOURNAL_CHART_ANCHOR_NOW = "now"
JOURNAL_CHART_DEFAULT_ANCHOR = JOURNAL_CHART_ANCHOR_CLOSE
def _load_font(size):
if not ImageFont:
return None
for name in ("msyh.ttc", "Microsoft YaHei.ttf", "arial.ttf", "Arial.ttf"):
try:
return ImageFont.truetype(name, size)
except Exception:
continue
try:
return ImageFont.load_default()
except Exception:
return None
def ohlcv_to_rows(ohlcv):
rows = []
for bar in ohlcv or []:
if not bar or len(bar) < 6:
continue
try:
rows.append(
{
"ts": int(bar[0]),
"o": float(bar[1]),
"h": float(bar[2]),
"l": float(bar[3]),
"c": float(bar[4]),
"v": float(bar[5]),
}
)
except Exception:
continue
return rows
def marker_tag_label(tag):
t = str(tag or "").strip().upper()
if t == "ENTRY":
return "开仓"
if t == "EXIT":
return "平仓"
if t == "STOP":
return "止损"
return str(tag or "")
def pick_marker_point(rows, target_ts_ms, target_price=None):
if not rows or target_ts_ms is None:
return None, None
idx = min(range(len(rows)), key=lambda i: abs(int(rows[i]["ts"]) - int(target_ts_ms)))
if target_price is not None:
try:
p = float(target_price)
if p > 0:
return idx, p
except Exception:
pass
return idx, float(rows[idx]["c"])
def parse_positive_price(raw):
if raw is None:
return None
s = str(raw).strip()
if not s:
return None
try:
p = float(s)
return p if p > 0 else None
except (TypeError, ValueError):
return None
def parse_journal_chart_anchor(raw):
s = str(raw or "").strip().lower()
if s in (JOURNAL_CHART_ANCHOR_NOW, "current", "当前", "当前时间"):
return JOURNAL_CHART_ANCHOR_NOW
return JOURNAL_CHART_ANCHOR_CLOSE
def parse_journal_chart_limit(raw, fallback=None):
fb = int(fallback if fallback is not None else JOURNAL_CHART_DEFAULT_LIMIT)
try:
n = int(str(raw or "").strip() or fb)
except (TypeError, ValueError):
n = fb
return max(JOURNAL_CHART_LIMIT_MIN, min(JOURNAL_CHART_LIMIT_MAX, n))
def normalize_chart_timeframe(raw):
tf = str(raw or "").strip().lower()
if tf in JOURNAL_CHART_TF_CHOICES:
return tf
return ""
def timeframe_period_ms(tf):
s = (tf or "").strip().lower()
if s.endswith("m"):
try:
return int(s[:-1]) * 60 * 1000
except ValueError:
pass
if s.endswith("h"):
try:
return int(s[:-1]) * 3600 * 1000
except ValueError:
pass
if s.endswith("d"):
try:
return int(s[:-1]) * 86400 * 1000
except ValueError:
pass
return 300000
def _to_int_ms(value):
if value is None:
return None
try:
v = int(value)
return v if v > 0 else None
except (TypeError, ValueError):
return None
def trade_review_fetch_window(entry_ts_ms, exit_ts_ms, timeframe, limit, anchor=None, now_ms=None):
"""
复盘 K 线窗口(anchor=close):
- 有开/平仓:从开仓前若干根起,到平仓 K 线止(覆盖整笔交易 + 入场前背景)
- 仅开仓:以开仓时间为终点向前 limit 根
- 仅平仓:以平仓时间为终点向前 limit 根
anchor=now:以当前时间为终点向前 limit 根(可看平仓后走势)
"""
period = timeframe_period_ms(timeframe)
lim = max(2, int(limit))
entry_ms = _to_int_ms(entry_ts_ms)
exit_ms = _to_int_ms(exit_ts_ms)
anch = (anchor or JOURNAL_CHART_DEFAULT_ANCHOR).strip().lower()
if anch == JOURNAL_CHART_ANCHOR_NOW:
end_ms = _to_int_ms(now_ms)
if not end_ms:
return None
since_ms = end_ms - period * (lim + 10)
return {
"since_ms": since_ms,
"end_ms": end_ms,
"window_start_ms": since_ms,
"fetch_limit": lim + 20,
"display_limit": lim,
}
if entry_ms and exit_ms:
if exit_ms < entry_ms:
entry_ms, exit_ms = exit_ms, entry_ms
span_bars = max(1, (exit_ms - entry_ms) // period + 1)
pre_bars = max(40, min(120, lim // 3))
need = span_bars + pre_bars
fetch_limit = min(JOURNAL_CHART_LIMIT_MAX, max(lim, need + 15))
since_ms = entry_ms - period * pre_bars
return {
"since_ms": since_ms,
"end_ms": exit_ms,
"window_start_ms": since_ms,
"fetch_limit": fetch_limit,
"display_limit": lim,
}
if entry_ms:
end_ms = entry_ms
since_ms = end_ms - period * (lim + 10)
return {
"since_ms": since_ms,
"end_ms": end_ms,
"window_start_ms": since_ms,
"fetch_limit": lim + 20,
"display_limit": lim,
}
if exit_ms:
end_ms = exit_ms
since_ms = end_ms - period * (lim + 10)
return {
"since_ms": since_ms,
"end_ms": end_ms,
"window_start_ms": since_ms,
"fetch_limit": lim + 20,
"display_limit": lim,
}
return None
def trim_rows_for_trade_review(rows, window):
if not window:
return list(rows or [])
start_ms = int(window["window_start_ms"])
end_ms = int(window["end_ms"])
lim = int(window["display_limit"])
filt = [r for r in (rows or []) if start_ms <= int(r["ts"]) <= end_ms]
if len(filt) > lim:
filt = filt[-lim:]
return filt
def parse_journal_chart_timeframes(tf1, tf2, fallback_tfs=None):
"""复盘表单:最多两个周期,去重保序。"""
out = []
for raw in (tf1, tf2):
tf = normalize_chart_timeframe(raw)
if tf and tf not in out:
out.append(tf)
if out:
return out[:2]
fb = [normalize_chart_timeframe(x) for x in (fallback_tfs or (JOURNAL_CHART_DEFAULT_TF1, JOURNAL_CHART_DEFAULT_TF2))]
fb = [x for x in fb if x]
return fb[:2] if fb else [JOURNAL_CHART_DEFAULT_TF1, JOURNAL_CHART_DEFAULT_TF2]
def marker_points_for_timeframe(rows, marker_payload):
points = []
if not marker_payload or not rows:
return points
entry_idx, entry_price = pick_marker_point(
rows, marker_payload.get("entry_ts_ms"), marker_payload.get("entry_price")
)
exit_idx, exit_price = pick_marker_point(
rows, marker_payload.get("exit_ts_ms"), marker_payload.get("exit_price")
)
if entry_idx is not None and entry_price is not None:
points.append({"idx": entry_idx, "price": entry_price, "tag": "ENTRY"})
if exit_idx is not None and exit_price is not None:
points.append({"idx": exit_idx, "price": exit_price, "tag": "EXIT"})
return points
def price_levels_from_marker_payload(marker_payload):
levels = []
if not marker_payload:
return levels
sl = parse_positive_price(marker_payload.get("stop_loss_price"))
if sl is not None:
levels.append({"price": sl, "label": "止损", "color": (255, 152, 0)})
return levels
def render_candles_subplot(
rows,
title,
width,
height,
bg_rgb=(255, 255, 255),
marker_points=None,
price_levels=None,
):
if not Image or not ImageDraw:
raise RuntimeError("缺少依赖:Pillowpip install Pillow")
img = Image.new("RGB", (width, height), bg_rgb)
draw = ImageDraw.Draw(img)
font = _load_font(14)
small = _load_font(12)
pad_l, pad_r, pad_t, pad_b = 46, 12, 26, 28
plot_w = max(10, width - pad_l - pad_r)
plot_h = max(10, height - pad_t - pad_b)
header_bg = (245, 247, 250)
draw.rectangle((0, 0, width, pad_t), fill=header_bg)
if font:
draw.text((10, 6), title, fill=(25, 35, 60), font=font)
else:
draw.text((10, 6), title, fill=(25, 35, 60))
if not rows:
if small:
draw.text((pad_l, pad_t + 10), "无K线数据", fill=(90, 100, 120), font=small)
else:
draw.text((pad_l, pad_t + 10), "无K线数据", fill=(90, 100, 120))
return img
lo = min(r["l"] for r in rows)
hi = max(r["h"] for r in rows)
for pl in price_levels or []:
try:
p = float(pl.get("price"))
if p > 0:
lo = min(lo, p)
hi = max(hi, p)
except (TypeError, ValueError):
pass
if hi <= lo:
hi = lo + 1e-12
n = len(rows)
marker_by_idx = {}
for mp in marker_points or []:
try:
idx = int(mp.get("idx"))
except Exception:
continue
if idx < 0 or idx >= n:
continue
marker_by_idx.setdefault(idx, []).append(mp)
x0 = pad_l
for i, r in enumerate(rows):
x1 = pad_l + int((i + 1) * plot_w / n)
x_mid = (x0 + x1) // 2
wick_x = x_mid
y_high = pad_t + int((hi - r["h"]) / (hi - lo) * plot_h)
y_low = pad_t + int((hi - r["l"]) / (hi - lo) * plot_h)
y_open = pad_t + int((hi - r["o"]) / (hi - lo) * plot_h)
y_close = pad_t + int((hi - r["c"]) / (hi - lo) * plot_h)
top = min(y_open, y_close)
bot = max(y_open, y_close)
up = r["c"] >= r["o"]
wick_color = (120, 120, 120)
edge_color = (20, 20, 20)
draw.line((wick_x, y_high, wick_x, y_low), fill=wick_color)
body_w = max(1, (x1 - x0) - 2)
left = x0 + 1
if bot - top < 2:
mid = (top + bot) // 2
draw.rectangle((left, mid, left + body_w, mid + 1), fill=edge_color)
else:
if up:
draw.rectangle((left, top, left + body_w, bot), fill=(255, 255, 255), outline=edge_color, width=1)
else:
draw.rectangle((left, top, left + body_w, bot), fill=edge_color, outline=edge_color, width=1)
for j, mp in enumerate(marker_by_idx.get(i, [])):
tag = str(mp.get("tag") or "")
label = marker_tag_label(tag)
m_price = float(mp.get("price") or r["c"])
y_m = pad_t + int((hi - m_price) / (hi - lo) * plot_h)
y_m = max(pad_t + 4, min(pad_t + plot_h - 4, y_m))
x_off = (j - (len(marker_by_idx[i]) - 1) / 2.0) * 14
x_draw = int(x_mid + x_off)
if tag == "ENTRY":
m_color = (0, 195, 95)
tri = [(x_draw, y_m - 20), (x_draw - 9, y_m - 4), (x_draw + 9, y_m - 4)]
text_y = y_m - 36
else:
m_color = (235, 65, 65)
tri = [(x_draw, y_m + 20), (x_draw - 9, y_m + 4), (x_draw + 9, y_m + 4)]
text_y = y_m + 12
draw.ellipse((x_draw - 5, y_m - 5, x_draw + 5, y_m + 5), fill=m_color, outline=(255, 255, 255), width=1)
draw.polygon(tri, fill=m_color)
draw.line((x_draw, y_m, x_draw, y_m - 16 if tag == "ENTRY" else y_m + 16), fill=m_color, width=3)
if font:
draw.text((x_draw + 8, text_y), label, fill=m_color, font=font)
else:
draw.text((x_draw + 8, text_y), label, fill=m_color)
x0 = x1
x_right = pad_l + plot_w
for pl in price_levels or []:
try:
p = float(pl.get("price"))
except (TypeError, ValueError):
continue
if p <= 0:
continue
y_sl = pad_t + int((hi - p) / (hi - lo) * plot_h)
color = tuple(pl.get("color") or (255, 152, 0))
label = str(pl.get("label") or "止损")
for xx in range(pad_l, x_right, 10):
draw.line((xx, y_sl, min(xx + 6, x_right), y_sl), fill=color, width=2)
if font:
draw.text((x_right - 72, y_sl - 18), label, fill=color, font=small or font)
else:
draw.text((x_right - 72, y_sl - 18), label, fill=color)
if len(marker_points or []) >= 2:
try:
entry = next((m for m in marker_points if m.get("tag") == "ENTRY"), None)
exitp = next((m for m in marker_points if m.get("tag") == "EXIT"), None)
if entry is not None and exitp is not None:
ex_i, ex_p = int(entry["idx"]), float(entry["price"])
xx_i, xx_p = int(exitp["idx"]), float(exitp["price"])
x_ex = pad_l + int((ex_i + 0.5) * plot_w / n)
x_xx = pad_l + int((xx_i + 0.5) * plot_w / n)
y_ex = pad_t + int((hi - ex_p) / (hi - lo) * plot_h)
y_xx = pad_t + int((hi - xx_p) / (hi - lo) * plot_h)
draw.line((x_ex, y_ex, x_xx, y_xx), fill=(35, 135, 255), width=3)
except Exception:
pass
if small:
draw.text((width - 210, height - 22), f"L={lo:.6g} H={hi:.6g}", fill=(120, 125, 135), font=small)
return img
def compose_chart_panels(panels, layout="grid", cell_w=980, cell_h=520, gap=10):
if not panels or not Image:
return None
if layout == "vertical":
cols = 1
rows_n = len(panels)
else:
cols = 2
rows_n = int(math.ceil(len(panels) / cols))
w = cols * cell_w + (cols - 1) * gap
h = rows_n * cell_h + (rows_n - 1) * gap
out = Image.new("RGB", (w, h), (255, 255, 255))
idx = 0
for r in range(rows_n):
for c in range(cols):
if idx >= len(panels):
break
x = c * (cell_w + gap)
y = r * (cell_h + gap)
out.paste(panels[idx], (x, y))
idx += 1
if ImageDraw and layout != "vertical" and rows_n >= 1:
draw_out = ImageDraw.Draw(out)
line_col = (220, 225, 232)
x_mid = cell_w + gap // 2
if w > x_mid >= 0:
draw_out.line((x_mid, 0, x_mid, h), fill=line_col, width=2)
for rr in range(1, rows_n):
y_mid = rr * cell_h + (rr - 1) * gap + gap // 2
if 0 <= y_mid <= h:
draw_out.line((0, y_mid, w, y_mid), fill=line_col, width=2)
elif ImageDraw and layout == "vertical" and rows_n >= 2:
draw_out = ImageDraw.Draw(out)
line_col = (220, 225, 232)
for rr in range(1, rows_n):
y_mid = rr * cell_h + (rr - 1) * gap + gap // 2
if 0 <= y_mid <= h:
draw_out.line((0, y_mid, w, y_mid), fill=line_col, width=2)
return out