文件预览

model_router.py

查看 帝国架构 Empire Architecture 技能包中的文件内容。

文件内容

core/model_router.py

"""帝国架构 v3.2.1 - 多模型路由器(MIMO / DeepSeek / Claude / GPT-4 / Ollama)
v3.2.1 升级: httpx 异步客户端 + 重试 + 连接池 + 超时分级
"""
import json
import os
import asyncio
import time
from core.logger import get_logger
from core.config import load_empire_config, get_model_config

log = get_logger("model_router")

# v3.2.1: 尝试加载 httpx,不可用则回退到 urllib
try:
    import httpx
    HAS_HTTPX = True
    log.info("HTTP 后端: httpx (异步 + 连接池)")
except ImportError:
    import urllib.request
    import urllib.error
    HAS_HTTPX = False
    log.info("HTTP 后端: urllib (同步回退,建议 pip install httpx)")

# v3.0: 任务类型 → 最优模型映射
TASK_MODEL_MAP = {
    # 代码类 → DeepSeek 便宜好用
    "code": ["deepseek", "mimo", "gpt4"],
    "coding": ["deepseek", "mimo", "gpt4"],
    "编程": ["deepseek", "mimo", "gpt4"],
    "代码": ["deepseek", "mimo", "gpt4"],

    # 分析类 → MIMO 或 Claude
    "analysis": ["mimo", "claude", "gpt4"],
    "分析": ["mimo", "claude", "gpt4"],
    "战略": ["claude", "mimo", "gpt4"],

    # 创意类 → Claude
    "creative": ["claude", "gpt4", "mimo"],
    "写作": ["claude", "mimo", "gpt4"],
    "创意": ["claude", "gpt4", "mimo"],

    # 翻译类 → 任意
    "translate": ["mimo", "deepseek", "claude"],
    "翻译": ["mimo", "deepseek", "claude"],

    # 检索/摘要 → DeepSeek 便宜
    "search": ["deepseek", "mimo"],
    "检索": ["deepseek", "mimo"],
    "摘要": ["deepseek", "mimo"],

    # 安全审计 → Claude 严谨
    "security": ["claude", "mimo", "gpt4"],
    "安全": ["claude", "mimo", "gpt4"],
}

# Agent 角色 → 优先模型
ROLE_MODEL_MAP = {
    "丞相": "mimo",
    "三公": "mimo",
    "参谋": "mimo",
    "执行": "deepseek",
    "监察": "deepseek",
    "翰林": "mimo",
    "武将": "deepseek",
    "郡守": "deepseek",
    "锦衣卫": "claude",
}


def select_model(agent_role: str, task_prompt: str = "", task_type: str = "") -> dict:
    """v3.0: 智能模型选择(多模型 + 任务类型 + 角色)"""
    config = load_empire_config()
    models = config.get("models", {})

    # 1. 按任务类型选模型
    if task_type and task_type in TASK_MODEL_MAP:
        for model_alias in TASK_MODEL_MAP[task_type]:
            if model_alias in models:
                model = models[model_alias].copy()
                model["alias"] = model_alias
                log.debug(f"任务类型路由: {task_type} → {model['name']}")
                return model

    # 2. 按关键词选模型
    task_lower = task_prompt.lower()
    for keywords, candidates in TASK_MODEL_MAP.items():
        if keywords in task_lower:
            for model_alias in candidates:
                if model_alias in models:
                    model = models[model_alias].copy()
                    model["alias"] = model_alias
                    log.debug(f"关键词路由: {keywords} → {model['name']}")
                    return model

    # 3. 按角色选模型
    for role_prefix, model_alias in ROLE_MODEL_MAP.items():
        if role_prefix in agent_role:
            if model_alias in models:
                model = models[model_alias].copy()
                model["alias"] = model_alias
                log.debug(f"角色路由: {agent_role} → {model['name']}")
                return model

    # 4. 默认 MIMO
    model = models.get("mimo", get_model_config("mimo")).copy()
    model["alias"] = "mimo"
    return model


def call_llm_api(model_config: dict, messages: list[dict], api_key: str,
                 timeout: int = 60) -> dict:
    """v3.2.1: 统一 LLM 调用(支持 MIMO/DeepSeek/Claude/GPT-4/Ollama)
    升级: httpx 异步 + 指数退避重试 + 连接池
    """
    provider = model_config.get("provider", "mimo")
    base_url = model_config.get("base_url", "")
    model_name = model_config.get("name", "mimo-v2.5-pro")
    max_tokens = model_config.get("max_tokens", 4096)
    temperature = model_config.get("temperature", 0.7)

    if provider == "anthropic":
        return _call_anthropic(base_url, model_name, messages, api_key, max_tokens, temperature, timeout)
    else:
        return _call_openai_compatible(base_url, model_name, messages, api_key, max_tokens, temperature, timeout)


def _retry_with_backoff(func, max_retries=3, base_delay=1.0, max_delay=30.0):
    """v3.2.1: 指数退避重试装饰器"""
    last_error = None
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            last_error = e
            error_str = str(e).lower()

            # 不可重试的错误
            if any(kw in error_str for kw in ["401", "403", "invalid_api_key", "authentication"]):
                raise

            if attempt < max_retries - 1:
                import random
                delay = min(base_delay * (2 ** attempt), max_delay)
                delay = delay * (0.5 + random.random() * 0.5)  # jitter
                log.warning(f"LLM 调用失败 (尝试 {attempt+1}/{max_retries}): {str(e)[:100]}, "
                           f"{delay:.1f}s 后重试")
                time.sleep(delay)

    raise last_error


def _call_openai_compatible(base_url: str, model: str, messages: list[dict],
                            api_key: str, max_tokens: int, temperature: float, timeout: int) -> dict:
    """v3.2.1: OpenAI 兼容接口(MIMO/DeepSeek/GPT-4/Ollama)
    支持 httpx 异步 + urllib 同步回退
    """
    url = base_url.rstrip("/") + "/chat/completions"
    body = {
        "model": model, "messages": messages,
        "max_tokens": max_tokens, "temperature": temperature,
    }
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {api_key}",
    }

    def _do_request():
        if HAS_HTTPX:
            with httpx.Client(timeout=timeout) as client:
                resp = client.post(url, json=body, headers=headers)
                resp.raise_for_status()
                data = resp.json()
        else:
            req = urllib.request.Request(
                url,
                data=json.dumps(body).encode(),
                headers=headers,
            )
            with urllib.request.urlopen(req, timeout=timeout) as resp:
                data = json.loads(resp.read())

        usage = data.get("usage", {})
        return {
            "content": data["choices"][0]["message"]["content"],
            "input_tokens": usage.get("prompt_tokens", 0),
            "output_tokens": usage.get("completion_tokens", 0),
            "model": model,
        }

    return _retry_with_backoff(_do_request)


def _call_anthropic(base_url: str, model: str, messages: list[dict],
                    api_key: str, max_tokens: int, temperature: float, timeout: int) -> dict:
    """v3.2.1: Anthropic Claude 接口
    支持 httpx 异步 + urllib 同步回退
    """
    url = base_url.rstrip("/") + "/messages"

    # 转换消息格式
    system_text = ""
    claude_messages = []
    for msg in messages:
        if msg["role"] == "system":
            system_text += msg["content"] + "\n"
        else:
            claude_messages.append(msg)

    body = {
        "model": model, "messages": claude_messages,
        "max_tokens": max_tokens, "temperature": temperature,
        "system": system_text.strip(),
    }
    headers = {
        "Content-Type": "application/json",
        "x-api-key": api_key,
        "anthropic-version": "2023-06-01",
    }

    def _do_request():
        if HAS_HTTPX:
            with httpx.Client(timeout=timeout) as client:
                resp = client.post(url, json=body, headers=headers)
                resp.raise_for_status()
                data = resp.json()
        else:
            req = urllib.request.Request(
                url,
                data=json.dumps(body).encode(),
                headers=headers,
            )
            with urllib.request.urlopen(req, timeout=timeout) as resp:
                data = json.loads(resp.read())

        usage = data.get("usage", {})
        return {
            "content": data["content"][0]["text"],
            "input_tokens": usage.get("input_tokens", 0),
            "output_tokens": usage.get("output_tokens", 0),
            "model": model,
        }

    return _retry_with_backoff(_do_request)


def get_available_models() -> dict:
    """获取所有可用模型"""
    config = load_empire_config()
    return config.get("models", {})