Files
qihuo/ai_client.py
T
dekun 840e88daad Add key-level auto trade, AI analysis, and trading UX improvements.
Key monitors use 5m close triggers with WeChat alerts and box/convergence auto orders; add pending-order worker, structured WeChat notify, AI settings/messages, session clock, CTP margin sizing, and dual-layer position limits.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-28 10:36:56 +08:00

103 lines
4.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Copyright (c) 2025-2026 马建军. All rights reserved.
# 专有软件 — 未经授权禁止复制、传播、转售。
# 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。
# 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md
"""AI 接口:Ollama / OpenAI 兼容 API。"""
from __future__ import annotations
import json
import logging
from typing import Callable, Optional
import requests
logger = logging.getLogger(__name__)
def is_ai_enabled(get_setting: Callable[[str, str], str]) -> bool:
return (get_setting("ai_enabled", "0") or "0").strip() in ("1", "true", "yes")
def get_ai_config(get_setting: Callable[[str, str], str]) -> dict:
provider = (get_setting("ai_provider", "ollama") or "ollama").strip().lower()
if provider not in ("ollama", "openai"):
provider = "ollama"
return {
"enabled": is_ai_enabled(get_setting),
"provider": provider,
"ollama_base_url": (get_setting("ai_ollama_base_url", "http://127.0.0.1:11434") or "").strip().rstrip("/"),
"ollama_model": (get_setting("ai_ollama_model", "qwen2.5:7b") or "qwen2.5:7b").strip(),
"openai_base_url": (get_setting("ai_openai_base_url", "https://api.openai.com/v1") or "").strip().rstrip("/"),
"openai_api_key": (get_setting("ai_openai_api_key", "") or "").strip(),
"openai_model": (get_setting("ai_openai_model", "gpt-4o-mini") or "gpt-4o-mini").strip(),
}
def chat_completion(
*,
get_setting: Callable[[str, str], str],
system_prompt: str,
user_prompt: str,
timeout: int = 120,
) -> tuple[bool, str]:
cfg = get_ai_config(get_setting)
if not cfg["enabled"]:
return False, "AI 未启用"
provider = cfg["provider"]
try:
if provider == "ollama":
url = f"{cfg['ollama_base_url']}/api/chat"
payload = {
"model": cfg["ollama_model"],
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"stream": False,
}
resp = requests.post(url, json=payload, timeout=timeout)
resp.raise_for_status()
data = resp.json()
msg = (data.get("message") or {}).get("content") or ""
return True, (msg or "").strip() or "AI 无回复)"
url = f"{cfg['openai_base_url']}/chat/completions"
headers = {
"Authorization": f"Bearer {cfg['openai_api_key']}",
"Content-Type": "application/json",
}
payload = {
"model": cfg["openai_model"],
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"temperature": 0.4,
}
resp = requests.post(url, headers=headers, json=payload, timeout=timeout)
resp.raise_for_status()
data = resp.json()
choices = data.get("choices") or []
if not choices:
return False, "AI 返回为空"
msg = (choices[0].get("message") or {}).get("content") or ""
return True, (msg or "").strip() or "AI 无回复)"
except Exception as exc:
logger.warning("AI chat failed (%s): %s", provider, exc)
return False, f"AI 调用失败:{exc}"
def analyze_trading_event(
*,
get_setting: Callable[[str, str], str],
event_kind: str,
payload: dict,
) -> tuple[bool, str]:
system = (
"你是国内期货交易复盘助手。根据提供的结构化交易数据,"
"用简洁中文给出 3~6 条要点:风险、纪律、改进建议。"
"不要编造未提供的数据;金额单位为元。"
)
user = f"事件类型:{event_kind}\n\n数据:\n{json.dumps(payload, ensure_ascii=False, indent=2)}"
return chat_completion(get_setting=get_setting, system_prompt=system, user_prompt=user)