文件预览

scraper.py

查看 抖音爬虫V2 技能包中的文件内容。

文件内容

scripts/scraper.py

#!/usr/bin/env python3
"""抖音爆款爬虫 - 使用 Playwright 自动化浏览器爬取抖音搜索结果,失败时降级到 Brave 搜索"""

import argparse
import csv
import json
import re
import subprocess
import sys
import time
from dataclasses import asdict, dataclass
from datetime import date
from pathlib import Path

try:
    from playwright.sync_api import sync_playwright
except Exception:
    sync_playwright = None


@dataclass
class VideoData:
    title: str = ""
    description: str = ""
    author: str = ""
    play_count: int = 0
    like_count: int = 0
    comment_count: int = 0
    share_count: int = 0
    url: str = ""
    tags: list | None = None
    publish_time: str = ""

    def to_dict(self) -> dict:
        data = asdict(self)
        data["tags"] = self.tags or []
        return data


def _parse_count(text: str) -> int:
    if not text:
        return 0
    text = text.strip().replace(",", "").replace(" ", "")
    try:
        return int(float(text))
    except ValueError:
        pass
    m = re.match(r"([\d.]+)\s*万", text)
    if m:
        return int(float(m.group(1)) * 10000)
    m = re.match(r"([\d.]+)\s*亿", text)
    if m:
        return int(float(m.group(1)) * 100000000)
    return 0


class DouyinScraper:
    DOUYIN_SEARCH = "https://www.douyin.com/search/"

    def __init__(self, headless: bool = True, delay: float = 2.0, timeout: int = 30000):
        self.headless = headless
        self.delay = delay
        self.timeout = timeout

    def search(self, keyword: str, limit: int) -> list[VideoData]:
        """Try Playwright first, then fall back to web search API."""
        # Try Playwright
        results = self._search_playwright(keyword, limit)
        if results:
            return results

        print("⚠️  Playwright 未获取到数据,尝试 web search fallback...")
        return self._search_web(keyword, limit)

    def _search_playwright(self, keyword: str, limit: int) -> list[VideoData]:
        if sync_playwright is None:
            return []

        results: list[VideoData] = []
        with sync_playwright() as p:
            browser = p.chromium.launch(
                headless=self.headless,
                args=["--disable-blink-features=AutomationControlled", "--no-sandbox"],
            )
            ctx = browser.new_context(
                user_agent=(
                    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                    "AppleWebKit/537.36 (KHTML, like Gecko) "
                    "Chrome/125.0.0.0 Safari/537.36"
                ),
                viewport={"width": 1280, "height": 900},
                locale="zh-CN",
            )
            page = ctx.new_page()
            try:
                url = f"{self.DOUYIN_SEARCH}{keyword}"
                print(f"🔍 [Playwright] 正在搜索: {keyword}")
                page.goto(url, wait_until="domcontentloaded", timeout=self.timeout)
                time.sleep(self.delay + 3)

                # scroll to load
                for _ in range(3):
                    page.evaluate("window.scrollBy(0, 800)")
                    time.sleep(0.8)

                # Try to find video links
                cards = page.query_selector_all("a[href*='/video/']")
                seen = set()
                for card in cards[:limit * 2]:
                    href = card.get_attribute("href") or ""
                    if "/video/" not in href:
                        continue
                    if href.startswith("/"):
                        video_url = f"https://www.douyin.com{href}"
                    else:
                        video_url = href
                    vid = re.search(r"/video/(\d+)", video_url)
                    if not vid or vid.group(1) in seen:
                        continue
                    seen.add(vid.group(1))

                    title = card.inner_text().strip()[:200] or f"抖音视频 {vid.group(1)}"
                    results.append(
                        VideoData(
                            title=title,
                            description="",
                            author="",
                            url=video_url,
                            tags=[keyword],
                            publish_time=date.today().isoformat(),
                        )
                    )
                    if len(results) >= limit:
                        break

            except Exception as exc:
                print(f"⚠️  Playwright 搜索出错: {exc}")
            finally:
                browser.close()

        if results:
            print(f"✅ [Playwright] 获取到 {len(results)} 条结果")
        return results

    def _search_web(self, keyword: str, limit: int) -> list[VideoData]:
        """Use Brave web search API via openclaw's web_search tool (CLI fallback: curl)."""
        results: list[VideoData] = []
        try:
            # Use the openclaw web_search via the gateway API
            import urllib.request
            import urllib.parse

            query = f"site:douyin.com {keyword}"
            # Try using the built-in web search by calling the CLI
            proc = subprocess.run(
                ["curl", "-s", "https://api.search.brave.com/res/v1/web/search",
                 "-H", "Accept: application/json",
                 "-H", f"X-Subscription-Token: {self._get_brave_key()}",
                 "--", f"https://api.search.brave.com/res/v1/web/search?q={urllib.parse.quote(query)}&count={min(limit, 10)}"],
                capture_output=True, text=True, timeout=15
            )
            if proc.returncode == 0 and proc.stdout:
                data = json.loads(proc.stdout)
                for item in data.get("web", {}).get("results", [])[:limit]:
                    url = item.get("url", "")
                    if "douyin.com" not in url:
                        continue
                    title = item.get("title", "").strip()
                    desc = item.get("description", "").strip()
                    results.append(
                        VideoData(
                            title=title,
                            description=desc,
                            author="",
                            url=url,
                            tags=[keyword],
                            publish_time=date.today().isoformat(),
                        )
                    )
        except Exception as exc:
            print(f"⚠️  Web search fallback 出错: {exc}")

        if results:
            print(f"✅ [Web Search] 获取到 {len(results)} 条结果")
        else:
            print("⚠️  Web search 也未获取到数据,返回示例数据")
            results = self._mock_search(keyword, limit)

        return results

    def _get_brave_key(self) -> str:
        """Try to read Brave API key from env."""
        import os
        return os.environ.get("BRAVE_API_KEY", "")

    def hot(self, category: str, limit: int) -> list[VideoData]:
        """Hot list - uses web search as primary method."""
        results: list[VideoData] = []
        try:
            import subprocess
            query = f"site:douyin.com 抖音热榜 {category}" if category else "site:douyin.com 抖音热榜"
            proc = subprocess.run(
                ["curl", "-s", f"https://api.search.brave.com/res/v1/web/search?q={query}&count={min(limit, 10)}",
                 "-H", "Accept: application/json",
                 "-H", f"X-Subscription-Token: {self._get_brave_key()}"],
                capture_output=True, text=True, timeout=15
            )
            if proc.returncode == 0 and proc.stdout:
                data = json.loads(proc.stdout)
                for item in data.get("web", {}).get("results", [])[:limit]:
                    url = item.get("url", "")
                    title = item.get("title", "").strip()
                    desc = item.get("description", "").strip()
                    results.append(
                        VideoData(
                            title=title,
                            description=desc,
                            author="",
                            url=url,
                            tags=["热榜", category] if category else ["热榜"],
                            publish_time=date.today().isoformat(),
                        )
                    )
        except Exception:
            pass

        if results:
            print(f"✅ 获取到 {len(results)} 条热榜数据")
        else:
            print("⚠️  未获取到数据,返回示例数据")
            results = self._mock_hot(category, limit)

        return results

    def _mock_search(self, keyword: str, limit: int) -> list[VideoData]:
        today = date.today().isoformat()
        return [
            VideoData(
                title=f"{keyword}相关视频 {i + 1}",
                description=f"这是关于{keyword}的示例描述(真实数据暂不可用)",
                author=f"作者{i + 1}",
                play_count=10000 * (i + 1),
                like_count=1000 * (i + 1),
                comment_count=100 * (i + 1),
                share_count=50 * (i + 1),
                url=f"https://www.douyin.com/search/{keyword}",
                tags=[keyword, "热门", "示例数据"],
                publish_time=today,
            )
            for i in range(min(limit, 10))
        ]

    def _mock_hot(self, category: str, limit: int) -> list[VideoData]:
        today = date.today().isoformat()
        label = category or "全部"
        return [
            VideoData(
                title=f"{label}热榜视频 {i + 1}",
                description=f"{label}分类示例热榜数据(真实数据暂不可用)",
                author=f"热门作者{i + 1}",
                play_count=50000 * (i + 1),
                like_count=5000 * (i + 1),
                comment_count=500 * (i + 1),
                share_count=200 * (i + 1),
                url="https://www.douyin.com/hot",
                tags=["热榜", label, "示例数据"],
                publish_time=today,
            )
            for i in range(min(limit, 20))
        ]


def write_json(items: list[VideoData], output: Path) -> None:
    output.write_text(
        json.dumps([item.to_dict() for item in items], ensure_ascii=False, indent=2),
        encoding="utf-8",
    )


def write_csv(items: list[VideoData], output: Path) -> None:
    if not items:
        output.write_text("", encoding="utf-8")
        return
    fieldnames = list(items[0].to_dict().keys())
    with output.open("w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        for item in items:
            row = item.to_dict()
            row["tags"] = "|".join(row["tags"])
            writer.writerow(row)


def main() -> None:
    parser = argparse.ArgumentParser(description="抖音爆款爬虫")
    sub = parser.add_subparsers(dest="command", required=True)

    search_p = sub.add_parser("search", help="搜索关键词")
    search_p.add_argument("--keyword", "-k", required=True, help="搜索关键词")
    search_p.add_argument("--limit", "-n", type=int, default=10, help="结果数量")
    search_p.add_argument("--output", "-o", help="输出文件路径")
    search_p.add_argument("--format", "-f", choices=["json", "csv"], default="json")
    search_p.add_argument("--headless", action="store_true", default=True)
    search_p.add_argument("--no-headless", dest="headless", action="store_false")
    search_p.add_argument("--delay", type=float, default=2.0, help="请求间隔(秒)")

    hot_p = sub.add_parser("hot", help="获取热榜")
    hot_p.add_argument("--category", "-c", default="", help="分类")
    hot_p.add_argument("--limit", "-n", type=int, default=20, help="结果数量")
    hot_p.add_argument("--output", "-o", help="输出文件路径")
    hot_p.add_argument("--format", "-f", choices=["json", "csv"], default="json")

    args = parser.parse_args()
    scraper = DouyinScraper(headless=getattr(args, "headless", True), delay=getattr(args, "delay", 2.0))

    if args.command == "search":
        items = scraper.search(args.keyword, args.limit)
    else:
        items = scraper.hot(args.category, args.limit)

    for i, item in enumerate(items, 1):
        print(f"{i}. {item.title} | {item.author} | 👍{item.like_count} | {item.url}")

    if args.output:
        output = Path(args.output)
        if args.format == "csv":
            write_csv(items, output)
        else:
            write_json(items, output)
        print(f"💾 已保存到: {args.output}")


if __name__ == "__main__":
    main()