文件内容
pansou.py
#!/usr/bin/env python3
"""
PanSou 盘搜 — 统一搜索入口
API: POST http://127.0.0.1:1080/api/search
POST http://127.0.0.1:1080/api/check/links
流程:
1. 先搜原始关键词
2. 评估结果质量,不足则自动扩展
3. 合并去重 → 链接检测 → 排序输出
用法:
python3 pansou.py <关键词> # 默认搜索
python3 pansou.py <关键词> --json # 输出 JSON 结果
python3 pansou.py <关键词> --expand # 强制扩展多语言搜索
"""
import json, re, sys, urllib.request, time
import json, re, sys, urllib.request, time, os
API_HOST = os.environ.get("PANSOU_API", "http://127.0.0.1:1080").rstrip("/")
API_SEARCH = f"{API_HOST}/api/search"
API_CHECK = f"{API_HOST}/api/check/links"
PANSOU_MAX_PER_TYPE = 5
# ─── 判断结果是否需要扩展 ────────────────────────────
MIN_RESULTS = 5
QUALITY_THRESHOLD = 3
def assess_quality(results: list[dict], kw: str) -> tuple[bool, str]:
"""评估搜索结果质量,返回 (need_expand, reason)"""
if not results:
return True, "无结果"
total = len(results)
kw_lower = kw.lower()
relevant = [r for r in results if kw_lower in r.get("title", "").lower()]
relevant_cjk = [r for r in results
if any('\u4e00' <= c <= '\u9fff' for c in kw)
and any(c in r.get("title", "") for c in kw)]
reason = f"共{total}条,相关{len(relevant)}条"
if total < MIN_RESULTS:
return True, f"{reason}(<{MIN_RESULTS}条)"
if len(relevant) < QUALITY_THRESHOLD and len(relevant_cjk) < QUALITY_THRESHOLD:
return True, f"{reason}(相关结果少)"
return False, reason
def gen_expand_queries(kw: str) -> list[str]:
"""根据关键词生成扩展搜索词(中/英/日文)"""
KNOWN = {
"王国之泪": ["tears of the kingdom", "totk", "ゼルダの伝説 ティアーズ オブ ザ キングダム"],
"旷野之息": ["breath of the wild", "botw", "ゼルダの伝説 ブレス オブ ザ ワイルド"],
"塞尔达": ["zelda legend", "zelda", "ゼルダの伝説"],
"原神": ["genshin impact", "genshin"],
"黑神话悟空": ["black myth wukong"],
"霍格沃茨": ["hogwarts legacy"],
"赛博朋克2077": ["cyberpunk 2077"],
"最终幻想7": ["final fantasy 7", "ff7"],
"艾尔登法环": ["elden ring"],
"勇者斗恶龙": ["dragon quest"],
}
kw_lower = kw.lower()
expanded = []
for cn, variants in KNOWN.items():
if cn in kw or kw in cn:
for v in variants:
if v.lower() not in kw_lower:
expanded.append(v)
return expanded
# ─── 搜索 ────────────────────────────────────────────
def search_one(kw: str) -> tuple[list[dict], str | None]:
try:
req = urllib.request.Request(
API_SEARCH,
data=json.dumps({"kw": kw}).encode(),
headers={"Content-Type": "application/json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=30) as resp:
data = json.loads(resp.read().decode())
except Exception as e:
return [], f"请求失败: {e}"
if data.get("code") != 0:
return [], f"API错误: {data.get('message', 'unknown')}"
merged = data.get("data", {}).get("merged_by_type", {})
results = []
for ptype, items in merged.items():
for item in items[:PANSOU_MAX_PER_TYPE]:
results.append({
"type": ptype,
"title": item.get("note", ""),
"url": item.get("url", ""),
"password": item.get("password", ""),
"datetime": (item.get("datetime", "") or "")[:10],
"source": item.get("source", ""),
"_query": kw,
})
return results, None
def dedup_results(results: list[dict]) -> list[dict]:
seen = set()
out = []
for r in results:
key = (r.get("type", ""), r.get("url", ""))
if key not in seen and key[1]:
seen.add(key)
out.append(r)
return out
# ─── 链接检测 ────────────────────────────────────────
SKIP_CHECK_TYPES = {"magnet", "ed2k", "guangya", "others", ""}
def check_links(results: list[dict]) -> tuple[list[dict], dict]:
items = []
for r in results:
if r.get("type", "") in SKIP_CHECK_TYPES:
continue
if not r.get("url"):
continue
items.append({
"disk_type": r.get("type", ""),
"url": r["url"],
"password": r.get("password") or "",
})
stats = {"ok": 0, "bad": 0, "locked": 0, "uncertain": 0, "unsupported": 0, "error": False}
if not items:
return results, stats
try:
req = urllib.request.Request(
API_CHECK,
data=json.dumps({"items": items}).encode(),
headers={"Content-Type": "application/json"},
method="POST"
)
with urllib.request.urlopen(req, timeout=60) as resp:
data = json.loads(resp.read().decode())
except Exception as e:
print(f"\n⚠️ 链接检测失败: {e},跳过检测")
stats["error"] = True
return results, stats
state_map = {c["url"]: c["state"] for c in data.get("results", [])}
valid = []
for r in results:
url = r.get("url", "")
ptype = r.get("type", "")
if ptype in SKIP_CHECK_TYPES or not url:
valid.append(r)
continue
state = state_map.get(url, "uncertain")
if state == "ok":
stats["ok"] += 1
valid.append(r)
elif state == "bad":
stats["bad"] += 1
elif state == "locked":
stats["locked"] += 1
r["locked"] = True
valid.append(r)
elif state == "uncertain":
stats["uncertain"] += 1
valid.append(r)
elif state == "unsupported":
stats["unsupported"] += 1
valid.append(r)
else:
stats["uncertain"] += 1
valid.append(r)
return valid, stats
# ─── 排序 ────────────────────────────────────────────
DISK_PRIORITY = {
"xunlei": 1, "aliyun": 2, "115": 3, "quark": 4,
"magnet": 5, "ed2k": 6, "baidu": 7, "uc": 8,
"tianyi": 9, "mobile": 10, "pikpak": 11, "123": 12,
"others": 13, "guangya": 14,
}
DISK_EMOJI = {
"xunlei": "⚡", "aliyun": "☁️", "115": "1️⃣1️⃣5️⃣",
"quark": "🟣", "magnet": "🧲", "ed2k": "🐴",
"baidu": "💾", "uc": "🌊", "tianyi": "📡",
"mobile": "📱", "pikpak": "📦", "123": "🔢",
"others": "📎", "guangya": "🦆",
}
DISK_NAME = {
"xunlei": "迅雷网盘", "aliyun": "阿里云盘", "115": "115网盘",
"quark": "夸克网盘", "magnet": "磁力链接", "ed2k": "电驴链接",
"baidu": "百度网盘", "uc": "UC网盘", "tianyi": "天翼云盘",
"mobile": "移动云盘", "pikpak": "PikPak", "123": "123网盘",
"others": "其他", "guangya": "光鸭云盘",
}
def relevance_score(r: dict, kw: str) -> int:
title = r.get("title", "").lower()
kw_l = kw.strip().lower()
kw_cjk = [c for c in kw.strip() if '\u4e00' <= c <= '\u9fff']
if title == kw_l:
return 1000
elif title.startswith(kw_l):
return 900
elif kw_l in title:
idx = title.find(kw_l)
before = title[:idx]
after = title[idx + len(kw_l):]
def is_cjk(c): return '\u4e00' <= c <= '\u9fff'
front_ok = not before or not is_cjk(before[-1]) if before else True
back_ok = not after or not is_cjk(after[0]) if after else True
if front_ok and back_ok: return 700
elif front_ok: return 600
elif back_ok: return 500
else: return 300
else:
if kw_cjk:
matched = sum(1 for c in kw_cjk if c in title)
return matched * 15
return 0
def sort_results(results: list[dict], kw: str) -> list[dict]:
scored = [
(relevance_score(r, kw), DISK_PRIORITY.get(r.get("type", ""), 99), r)
for r in results
]
scored.sort(key=lambda x: (x[1], -x[0]))
return [r for _, _, r in scored]
# ─── 输出 ────────────────────────────────────────────
def format_table(kw: str, results: list[dict], stats: dict, elapsed: float,
expanded: bool = False, extra_queries: list[str] = None) -> str:
lines = [f"🐉 **「{kw}」搜索结果(共{len(results)}条)**\n"]
if expanded and extra_queries:
lines.append(f" 🔄 已扩展: {kw} + {' + '.join(extra_queries)}")
lines.append("")
if stats.get("bad"):
lines.append(f" ❌ 失效已过滤: {stats['bad']}条")
if stats.get("locked"):
lines.append(f" 🔒 需密码: {stats['locked']}条")
if stats.get("uncertain"):
lines.append(f" ❓ 状态未知: {stats['uncertain']}条(保留)")
if stats.get("unsupported"):
lines.append(f" ⚪ 不支持检测: {stats['unsupported']}条")
if stats.get("ok"):
lines.append(f" ✅ 有效: {stats['ok']}条")
if not any(stats.get(k) for k in ("bad", "locked", "uncertain", "unsupported", "ok")) and not stats.get("error"):
pass # 没有任何检测数据,不打印摘要
lines.append("")
current_type = None
for r in results:
ptype = r["type"]
if ptype != current_type:
current_type = ptype
emoji = DISK_EMOJI.get(ptype, "📎")
name = DISK_NAME.get(ptype, ptype)
lines.append(f"\n{emoji} **{name}**")
title = r["title"]
pwd = r.get("password", "")
locked = r.get("locked")
dt = r.get("datetime", "")
url = r.get("url", "")
entry = f" • {title}"
if dt and dt not in ("", "0001-01-01"):
entry += f" `{dt}`"
if pwd:
entry += f" 🔑{pwd}"
elif locked:
entry += " 🔒需提取码"
lines.append(entry)
if url:
lines.append(f" 🔗 {url}")
else:
lines.append(f" ⚠️ 无直链")
lines.append(f"\n⏱ {elapsed:.1f}秒")
lines.append("\n---\n💡 **推荐用迅雷下载**:告诉我要下哪个,我帮你调用 xunlei 下载")
return "\n".join(lines)
def format_json(kw: str, results: list[dict], elapsed: float) -> dict:
return {
"kw": kw,
"total": len(results),
"elapsed": round(elapsed, 1),
"results": results,
}
# ─── 主程序 ──────────────────────────────────────────
if __name__ == "__main__":
kw = sys.argv[1] if len(sys.argv) > 1 else ""
if not kw:
print("用法: python3 pansou.py <关键词> [--json] [--expand]")
sys.exit(1)
mode = "json" if "--json" in sys.argv[2:] else "table"
force_expand = "--expand" in sys.argv[2:]
t0 = time.time()
# Step 1: 先搜原始词
print(f"\n🔍 搜索: {kw}")
results, err = search_one(kw)
if err:
print(f"❌ {err}")
sys.exit(1)
all_results = list(results)
expanded_queries = []
if not results:
# 0 结果:脚本只报告,AI 层决定是否联网找更好的词
print("⚠️ 搜 0 条,如需联网找更好检索词请告知")
else:
# Step 2: 评估是否需要扩展
if force_expand:
need_expand, reason = True, "强制扩展"
else:
need_expand, reason = assess_quality(results, kw)
if need_expand:
print(f" 📊 {reason},开始扩展搜索...")
extra = gen_expand_queries(kw)
if extra:
expanded_queries = extra
print(f" 🔄 扩展词: {' + '.join(extra)}")
for q in extra:
print(f" 🔍 搜: {q}")
res, _ = search_one(q)
all_results.extend(res)
else:
print(" ⚠️ 无可用扩展词")
else:
print(f" ✅ 结果良好,无需扩展({reason})")
# 去重
results = dedup_results(all_results)
raw_count = len(all_results)
print(f"\n📡 共 {raw_count} 条 → 去重后 {len(results)} 条,开始检测链接有效性...")
# 链接检测
valid_results, stats = check_links(results)
# 排序
sorted_results = sort_results(valid_results, kw)
elapsed = time.time() - t0
if mode == "json":
print(json.dumps(format_json(kw, sorted_results, elapsed), ensure_ascii=False, indent=2))
else:
print(format_table(kw, sorted_results, stats, elapsed,
expanded=bool(expanded_queries),
extra_queries=expanded_queries))
# 保存
save_path = os.path.join(os.path.dirname(__file__), "search_result.json")
with open(save_path, "w", encoding="utf-8") as f:
json.dump(format_json(kw, sorted_results, elapsed), f, ensure_ascii=False, indent=2)
print(f"\n💾 结果已保存: {save_path}")