文件预览

scope_manager.py

查看 1688 Shop Operate 技能包中的文件内容。

文件内容

scripts/scope_manager.py

"""
Scope 权限列表管理

职责:从服务端查询所有可用 scope,本地文件缓存(24h TTL)。
"""
from __future__ import annotations

import json
import logging
import ssl
import time
from pathlib import Path
from urllib.request import Request, urlopen
from urllib.error import HTTPError, URLError

from _const import (
    SCOPE_LIST_ENDPOINT,
    SCOPE_CACHE_FILE,
    SCOPE_CACHE_TTL,
    HTTP_TIMEOUT,
)
from _auth import build_auth_headers

logger = logging.getLogger(__name__)


def _fetch_from_api() -> list[dict]:
    body_str = json.dumps({})

    from urllib.parse import urlparse as _urlparse
    endpoint_path = _urlparse(SCOPE_LIST_ENDPOINT).path

    auth_headers = build_auth_headers("POST", endpoint_path, body_str)
    if auth_headers is None:
        logger.warning("AK 未配置,将以无签名方式发送 Scope 查询请求")
        auth_headers = {"Content-Type": "application/json"}

    req = Request(SCOPE_LIST_ENDPOINT, data=body_str.encode("utf-8"),
                  headers=auth_headers, method="POST")

    ctx = ssl.create_default_context()
    with urlopen(req, timeout=HTTP_TIMEOUT, context=ctx) as resp:
        raw_body = json.loads(resp.read().decode("utf-8"))

    if not raw_body.get("success"):
        raise RuntimeError(raw_body.get("message") or raw_body.get("msgInfo")
                           or "服务端返回 success=false")

    scopes = raw_body.get("data", {}).get("response", [])
    if not isinstance(scopes, list):
        raise RuntimeError("响应格式异常:data.response 不是数组")
    return scopes


def _load_cache(cache_file: Path = SCOPE_CACHE_FILE) -> tuple[list[dict] | None, bool]:
    if not cache_file.exists():
        return None, False
    try:
        data = json.loads(cache_file.read_text(encoding="utf-8"))
        cached_at = data.get("cached_at", 0)
        scopes = data.get("scopes", [])
        if not isinstance(scopes, list) or not scopes:
            return None, False
        fresh = (time.time() - cached_at) < SCOPE_CACHE_TTL
        return scopes, fresh
    except Exception as e:
        logger.warning("读取 scope 缓存失败: %s", e)
        return None, False


def _save_cache(scopes: list[dict], cache_file: Path = SCOPE_CACHE_FILE) -> None:
    try:
        cache_data = {"cached_at": int(time.time()), "scopes": scopes}
        cache_file.write_text(json.dumps(cache_data, ensure_ascii=False, indent=2),
                              encoding="utf-8")
    except Exception as e:
        logger.warning("写入 scope 缓存失败: %s", e)


def query_all_scope(cache_file: Path = SCOPE_CACHE_FILE) -> dict:
    """
    查询所有可用 scope(带缓存)。

    Returns:
        成功: {"success": True, "scopes": [...], "from_cache": bool}
        失败: {"success": False, "error": str, "error_description": str}
    """
    cached, fresh = _load_cache(cache_file)
    if cached and fresh:
        return {"success": True, "scopes": cached, "from_cache": True}

    try:
        scopes = _fetch_from_api()
        _save_cache(scopes, cache_file)
        return {"success": True, "scopes": scopes, "from_cache": False}
    except HTTPError as e:
        error_body = ""
        try:
            error_body = e.read().decode("utf-8")
        except Exception:
            pass
        error_msg = f"HTTP {e.code}: {error_body}" if error_body else f"HTTP {e.code}"
        logger.warning("查询 scope 列表失败: %s", error_msg)
    except URLError as e:
        error_msg = f"网络错误: {e.reason}"
        logger.warning("查询 scope 列表失败: %s", error_msg)
    except Exception as e:
        error_msg = str(e)
        logger.warning("查询 scope 列表失败: %s", error_msg)

    if cached:
        return {"success": True, "scopes": cached, "from_cache": True, "stale": True}

    return {"success": False, "error": "SCOPE_QUERY_FAILED", "error_description": error_msg}


def format_scope_list_markdown(scopes: list[dict]) -> str:
    risk_order = {"低": 0, "中": 1, "高": 2}
    risk_labels = {"低": "低风险权限", "中": "中风险权限", "高": "高风险权限"}

    groups: dict[str, list[dict]] = {}
    for s in scopes:
        level = s.get("riskLevel", "低")
        groups.setdefault(level, []).append(s)

    parts = []
    for level in sorted(groups.keys(), key=lambda x: risk_order.get(x, 99)):
        label = risk_labels.get(level, f"{level}风险权限")
        parts.append(f"**{label}**\n")
        parts.append("| 权限说明 | 标识符 | 风险等级 |")
        parts.append("|----------|--------|----------|")
        for s in groups[level]:
            desc = s.get("description", s.get("scope", ""))
            scope_id = s.get("scope", "")
            parts.append(f"| {desc} | `{scope_id}` | {level} |")
        parts.append("")

    return "\n".join(parts)