文件预览

pansou.py

查看 Pansou 技能包中的文件内容。

文件内容

pansou.py

#!/usr/bin/env python3
"""
PanSou 盘搜 — 统一搜索入口
API: POST http://127.0.0.1:1080/api/search
     POST http://127.0.0.1:1080/api/check/links

流程:
  1. 先搜原始关键词
  2. 评估结果质量,不足则自动扩展
  3. 合并去重 → 链接检测 → 排序输出

用法:
  python3 pansou.py <关键词>            # 默认搜索
  python3 pansou.py <关键词> --json     # 输出 JSON 结果
  python3 pansou.py <关键词> --expand   # 强制扩展多语言搜索
"""
import json, re, sys, urllib.request, time

import json, re, sys, urllib.request, time, os

API_HOST = os.environ.get("PANSOU_API", "http://127.0.0.1:1080").rstrip("/")
API_SEARCH = f"{API_HOST}/api/search"
API_CHECK  = f"{API_HOST}/api/check/links"
PANSOU_MAX_PER_TYPE = 5

# ─── 判断结果是否需要扩展 ────────────────────────────
MIN_RESULTS = 5
QUALITY_THRESHOLD = 3


def assess_quality(results: list[dict], kw: str) -> tuple[bool, str]:
    """评估搜索结果质量,返回 (need_expand, reason)"""
    if not results:
        return True, "无结果"
    total = len(results)
    kw_lower = kw.lower()
    relevant = [r for r in results if kw_lower in r.get("title", "").lower()]
    relevant_cjk = [r for r in results
                    if any('\u4e00' <= c <= '\u9fff' for c in kw)
                    and any(c in r.get("title", "") for c in kw)]
    reason = f"共{total}条,相关{len(relevant)}条"
    if total < MIN_RESULTS:
        return True, f"{reason}(<{MIN_RESULTS}条)"
    if len(relevant) < QUALITY_THRESHOLD and len(relevant_cjk) < QUALITY_THRESHOLD:
        return True, f"{reason}(相关结果少)"
    return False, reason


def gen_expand_queries(kw: str) -> list[str]:
    """根据关键词生成扩展搜索词(中/英/日文)"""
    KNOWN = {
        "王国之泪": ["tears of the kingdom", "totk", "ゼルダの伝説 ティアーズ オブ ザ キングダム"],
        "旷野之息": ["breath of the wild", "botw", "ゼルダの伝説 ブレス オブ ザ ワイルド"],
        "塞尔达": ["zelda legend", "zelda", "ゼルダの伝説"],
        "原神": ["genshin impact", "genshin"],
        "黑神话悟空": ["black myth wukong"],
        "霍格沃茨": ["hogwarts legacy"],
        "赛博朋克2077": ["cyberpunk 2077"],
        "最终幻想7": ["final fantasy 7", "ff7"],
        "艾尔登法环": ["elden ring"],
        "勇者斗恶龙": ["dragon quest"],
    }
    kw_lower = kw.lower()
    expanded = []
    for cn, variants in KNOWN.items():
        if cn in kw or kw in cn:
            for v in variants:
                if v.lower() not in kw_lower:
                    expanded.append(v)
    return expanded


# ─── 搜索 ────────────────────────────────────────────

def search_one(kw: str) -> tuple[list[dict], str | None]:
    try:
        req = urllib.request.Request(
            API_SEARCH,
            data=json.dumps({"kw": kw}).encode(),
            headers={"Content-Type": "application/json"},
            method="POST"
        )
        with urllib.request.urlopen(req, timeout=30) as resp:
            data = json.loads(resp.read().decode())
    except Exception as e:
        return [], f"请求失败: {e}"
    if data.get("code") != 0:
        return [], f"API错误: {data.get('message', 'unknown')}"
    merged = data.get("data", {}).get("merged_by_type", {})
    results = []
    for ptype, items in merged.items():
        for item in items[:PANSOU_MAX_PER_TYPE]:
            results.append({
                "type": ptype,
                "title": item.get("note", ""),
                "url": item.get("url", ""),
                "password": item.get("password", ""),
                "datetime": (item.get("datetime", "") or "")[:10],
                "source": item.get("source", ""),
                "_query": kw,
            })
    return results, None


def dedup_results(results: list[dict]) -> list[dict]:
    seen = set()
    out = []
    for r in results:
        key = (r.get("type", ""), r.get("url", ""))
        if key not in seen and key[1]:
            seen.add(key)
            out.append(r)
    return out


# ─── 链接检测 ────────────────────────────────────────

SKIP_CHECK_TYPES = {"magnet", "ed2k", "guangya", "others", ""}


def check_links(results: list[dict]) -> tuple[list[dict], dict]:
    items = []
    for r in results:
        if r.get("type", "") in SKIP_CHECK_TYPES:
            continue
        if not r.get("url"):
            continue
        items.append({
            "disk_type": r.get("type", ""),
            "url": r["url"],
            "password": r.get("password") or "",
        })

    stats = {"ok": 0, "bad": 0, "locked": 0, "uncertain": 0, "unsupported": 0, "error": False}
    if not items:
        return results, stats

    try:
        req = urllib.request.Request(
            API_CHECK,
            data=json.dumps({"items": items}).encode(),
            headers={"Content-Type": "application/json"},
            method="POST"
        )
        with urllib.request.urlopen(req, timeout=60) as resp:
            data = json.loads(resp.read().decode())
    except Exception as e:
        print(f"\n⚠️ 链接检测失败: {e},跳过检测")
        stats["error"] = True
        return results, stats

    state_map = {c["url"]: c["state"] for c in data.get("results", [])}
    valid = []
    for r in results:
        url = r.get("url", "")
        ptype = r.get("type", "")
        if ptype in SKIP_CHECK_TYPES or not url:
            valid.append(r)
            continue
        state = state_map.get(url, "uncertain")
        if state == "ok":
            stats["ok"] += 1
            valid.append(r)
        elif state == "bad":
            stats["bad"] += 1
        elif state == "locked":
            stats["locked"] += 1
            r["locked"] = True
            valid.append(r)
        elif state == "uncertain":
            stats["uncertain"] += 1
            valid.append(r)
        elif state == "unsupported":
            stats["unsupported"] += 1
            valid.append(r)
        else:
            stats["uncertain"] += 1
            valid.append(r)

    return valid, stats


# ─── 排序 ────────────────────────────────────────────

DISK_PRIORITY = {
    "xunlei": 1, "aliyun": 2, "115": 3, "quark": 4,
    "magnet": 5, "ed2k": 6, "baidu": 7, "uc": 8,
    "tianyi": 9, "mobile": 10, "pikpak": 11, "123": 12,
    "others": 13, "guangya": 14,
}
DISK_EMOJI = {
    "xunlei": "⚡", "aliyun": "☁️", "115": "1️⃣1️⃣5️⃣",
    "quark": "🟣", "magnet": "🧲", "ed2k": "🐴",
    "baidu": "💾", "uc": "🌊", "tianyi": "📡",
    "mobile": "📱", "pikpak": "📦", "123": "🔢",
    "others": "📎", "guangya": "🦆",
}
DISK_NAME = {
    "xunlei": "迅雷网盘", "aliyun": "阿里云盘", "115": "115网盘",
    "quark": "夸克网盘", "magnet": "磁力链接", "ed2k": "电驴链接",
    "baidu": "百度网盘", "uc": "UC网盘", "tianyi": "天翼云盘",
    "mobile": "移动云盘", "pikpak": "PikPak", "123": "123网盘",
    "others": "其他", "guangya": "光鸭云盘",
}


def relevance_score(r: dict, kw: str) -> int:
    title = r.get("title", "").lower()
    kw_l = kw.strip().lower()
    kw_cjk = [c for c in kw.strip() if '\u4e00' <= c <= '\u9fff']
    if title == kw_l:
        return 1000
    elif title.startswith(kw_l):
        return 900
    elif kw_l in title:
        idx = title.find(kw_l)
        before = title[:idx]
        after = title[idx + len(kw_l):]
        def is_cjk(c): return '\u4e00' <= c <= '\u9fff'
        front_ok = not before or not is_cjk(before[-1]) if before else True
        back_ok = not after or not is_cjk(after[0]) if after else True
        if front_ok and back_ok: return 700
        elif front_ok: return 600
        elif back_ok: return 500
        else: return 300
    else:
        if kw_cjk:
            matched = sum(1 for c in kw_cjk if c in title)
            return matched * 15
    return 0


def sort_results(results: list[dict], kw: str) -> list[dict]:
    scored = [
        (relevance_score(r, kw), DISK_PRIORITY.get(r.get("type", ""), 99), r)
        for r in results
    ]
    scored.sort(key=lambda x: (x[1], -x[0]))
    return [r for _, _, r in scored]


# ─── 输出 ────────────────────────────────────────────

def format_table(kw: str, results: list[dict], stats: dict, elapsed: float,
                 expanded: bool = False, extra_queries: list[str] = None) -> str:
    lines = [f"🐉 **「{kw}」搜索结果(共{len(results)}条)**\n"]

    if expanded and extra_queries:
        lines.append(f"  🔄 已扩展: {kw} + {' + '.join(extra_queries)}")
        lines.append("")

    if stats.get("bad"):
        lines.append(f"  ❌ 失效已过滤: {stats['bad']}条")
    if stats.get("locked"):
        lines.append(f"  🔒 需密码: {stats['locked']}条")
    if stats.get("uncertain"):
        lines.append(f"  ❓ 状态未知: {stats['uncertain']}条(保留)")
    if stats.get("unsupported"):
        lines.append(f"  ⚪ 不支持检测: {stats['unsupported']}条")
    if stats.get("ok"):
        lines.append(f"  ✅ 有效: {stats['ok']}条")
    if not any(stats.get(k) for k in ("bad", "locked", "uncertain", "unsupported", "ok")) and not stats.get("error"):
        pass  # 没有任何检测数据,不打印摘要
    lines.append("")

    current_type = None
    for r in results:
        ptype = r["type"]
        if ptype != current_type:
            current_type = ptype
            emoji = DISK_EMOJI.get(ptype, "📎")
            name = DISK_NAME.get(ptype, ptype)
            lines.append(f"\n{emoji} **{name}**")

        title = r["title"]
        pwd = r.get("password", "")
        locked = r.get("locked")
        dt = r.get("datetime", "")
        url = r.get("url", "")

        entry = f"  • {title}"
        if dt and dt not in ("", "0001-01-01"):
            entry += f" `{dt}`"
        if pwd:
            entry += f" 🔑{pwd}"
        elif locked:
            entry += " 🔒需提取码"
        lines.append(entry)

        if url:
            lines.append(f"    🔗 {url}")
        else:
            lines.append(f"    ⚠️ 无直链")

    lines.append(f"\n⏱ {elapsed:.1f}秒")
    lines.append("\n---\n💡 **推荐用迅雷下载**:告诉我要下哪个,我帮你调用 xunlei 下载")
    return "\n".join(lines)


def format_json(kw: str, results: list[dict], elapsed: float) -> dict:
    return {
        "kw": kw,
        "total": len(results),
        "elapsed": round(elapsed, 1),
        "results": results,
    }


# ─── 主程序 ──────────────────────────────────────────

if __name__ == "__main__":
    kw = sys.argv[1] if len(sys.argv) > 1 else ""
    if not kw:
        print("用法: python3 pansou.py <关键词> [--json] [--expand]")
        sys.exit(1)

    mode = "json" if "--json" in sys.argv[2:] else "table"
    force_expand = "--expand" in sys.argv[2:]

    t0 = time.time()

    # Step 1: 先搜原始词
    print(f"\n🔍 搜索: {kw}")
    results, err = search_one(kw)
    if err:
        print(f"❌ {err}")
        sys.exit(1)

    all_results = list(results)
    expanded_queries = []

    if not results:
        # 0 结果:脚本只报告,AI 层决定是否联网找更好的词
        print("⚠️ 搜 0 条,如需联网找更好检索词请告知")
    else:
        # Step 2: 评估是否需要扩展
        if force_expand:
            need_expand, reason = True, "强制扩展"
        else:
            need_expand, reason = assess_quality(results, kw)

        if need_expand:
            print(f"  📊 {reason},开始扩展搜索...")
            extra = gen_expand_queries(kw)
            if extra:
                expanded_queries = extra
                print(f"  🔄 扩展词: {' + '.join(extra)}")
                for q in extra:
                    print(f"  🔍 搜: {q}")
                    res, _ = search_one(q)
                    all_results.extend(res)
            else:
                print("  ⚠️ 无可用扩展词")
        else:
            print(f"  ✅ 结果良好,无需扩展({reason})")

    # 去重
    results = dedup_results(all_results)
    raw_count = len(all_results)
    print(f"\n📡 共 {raw_count} 条 → 去重后 {len(results)} 条,开始检测链接有效性...")

    # 链接检测
    valid_results, stats = check_links(results)

    # 排序
    sorted_results = sort_results(valid_results, kw)
    elapsed = time.time() - t0

    if mode == "json":
        print(json.dumps(format_json(kw, sorted_results, elapsed), ensure_ascii=False, indent=2))
    else:
        print(format_table(kw, sorted_results, stats, elapsed,
                          expanded=bool(expanded_queries),
                          extra_queries=expanded_queries))

    # 保存
    save_path = os.path.join(os.path.dirname(__file__), "search_result.json")
    with open(save_path, "w", encoding="utf-8") as f:
        json.dump(format_json(kw, sorted_results, elapsed), f, ensure_ascii=False, indent=2)
    print(f"\n💾 结果已保存: {save_path}")