文件预览

scraper.py

查看 抖音爆款爬虫 Pro 技能包中的文件内容。

文件内容

scripts/scraper.py

#!/usr/bin/env python3
"""抖音爆款爬虫 - Python 版本,使用 Playwright 移动端提取真实数据"""

import argparse
import csv
import json
import re
import sys
import time
from dataclasses import asdict, dataclass, field
from datetime import date
from pathlib import Path

try:
    from playwright.sync_api import sync_playwright
except Exception:
    sync_playwright = None


@dataclass
class VideoData:
    title: str = ""
    description: str = ""
    author: str = ""
    play_count: int = 0
    like_count: int = 0
    comment_count: int = 0
    share_count: int = 0
    url: str = ""
    tags: list = field(default_factory=list)
    publish_time: str = ""

    def to_dict(self) -> dict:
        return asdict(self)


def _parse_count(text: str) -> int:
    """Parse Chinese count strings like '1.2万' or '3.5亿' to int."""
    if not text:
        return 0
    text = text.strip().replace(",", "")
    try:
        return int(float(text))
    except ValueError:
        pass
    m = re.match(r"([\d.]+)\s*万", text)
    if m:
        return int(float(m.group(1)) * 10000)
    m = re.match(r"([\d.]+)\s*亿", text)
    if m:
        return int(float(m.group(1)) * 100000000)
    nums = re.findall(r"\d+", text)
    if nums:
        return int(nums[0])
    return 0


MOBILE_UA = (
    "Mozilla/5.0 (iPhone; CPU iPhone OS 17_0 like Mac OS X) "
    "AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1"
)


class DouyinScraper:
    def __init__(self, headless: bool = True, delay: float = 2.0):
        self.headless = headless
        self.delay = delay

    def _launch_browser(self, p):
        browser = p.chromium.launch(
            headless=self.headless,
            args=[
                "--disable-blink-features=AutomationControlled",
                "--no-sandbox",
                "--disable-setuid-sandbox",
            ],
        )
        context = browser.new_context(
            user_agent=MOBILE_UA,
            viewport={"width": 390, "height": 844},
            locale="zh-CN",
            is_mobile=True,
        )
        page = context.new_page()
        return browser, page

    def _extract_search_results(self, page, keyword: str, limit: int) -> list:
        """Extract video data from Douyin mobile search page."""
        videos = []
        try:
            page.wait_for_selector('[class*="h5-video-card"], [class*="video-card"]', timeout=15000)
            time.sleep(self.delay)
        except Exception:
            pass

        # Scroll to load more results
        scrolls = max(1, limit // 5)
        for i in range(scrolls):
            page.evaluate(f"window.scrollTo(0, {(i + 1) * 800})")
            time.sleep(1.5)

        cards = page.query_selector_all('[class*="h5-video-card"]')
        if not cards:
            cards = page.query_selector_all('[class*="video-card"]')

        for card in cards[:limit]:
            try:
                text = card.inner_text()
                lines = [l.strip() for l in text.split("\n") if l.strip()]

                # Skip ads (contain '广告')
                if any("广告" in l for l in lines):
                    continue

                video = VideoData()
                video.tags = []

                # Get link
                link = card.query_selector('a[href*="/video/"]')
                href = ""
                if link:
                    href = link.get_attribute("href") or ""
                    if href.startswith("/"):
                        href = "https://www.douyin.com" + href

                # Parse lines: author, date, title, then stats
                author = lines[0] if len(lines) > 0 else ""
                date_str = lines[1] if len(lines) > 1 else ""
                title = lines[2] if len(lines) > 2 else ""

                # Extract tags from title
                tags = re.findall(r"#(\S+)", title)

                # Stats are the remaining numeric lines
                stat_lines = lines[3:] if len(lines) > 3 else []
                stats = []
                for s in stat_lines:
                    s = s.strip()
                    if re.match(r"^[\d.]+万?$", s):
                        val = s.replace("万", "")
                        try:
                            stats.append(int(float(val) * 10000) if "万" in s else int(float(val)))
                        except ValueError:
                            pass

                video.title = title
                video.description = title
                video.author = author
                video.publish_time = date_str
                video.url = href
                video.tags = tags
                if len(stats) >= 1:
                    video.like_count = stats[0]
                if len(stats) >= 2:
                    video.comment_count = stats[1]
                if len(stats) >= 3:
                    video.play_count = stats[2]
                if len(stats) >= 4:
                    video.share_count = stats[3]

                if not video.title:
                    continue

                videos.append(video)
            except Exception:
                continue

        return videos

    def _extract_hot_results(self, page, limit: int) -> list:
        """Extract hot/trending data from Douyin hot page."""
        videos = []
        try:
            page.wait_for_selector('[class*="hot"], [class*="trending"]', timeout=15000)
            time.sleep(self.delay)
        except Exception:
            pass

        for i in range(3):
            page.evaluate(f"window.scrollTo(0, {(i + 1) * 800})")
            time.sleep(1.5)

        cards = page.query_selector_all('[class*="h5-video-card"]')
        if not cards:
            cards = page.query_selector_all('[class*="hot-item"], [class*="trending-item"]')

        for card in cards[:limit]:
            try:
                text = card.inner_text()
                lines = [l.strip() for l in text.split("\n") if l.strip()]
                if any("广告" in l for l in lines):
                    continue

                video = VideoData()
                video.tags = ["热榜"]

                link = card.query_selector('a[href*="/video/"]')
                href = ""
                if link:
                    href = link.get_attribute("href") or ""
                    if href.startswith("/"):
                        href = "https://www.douyin.com" + href

                video.title = lines[2] if len(lines) > 2 else (lines[0] if lines else "")
                video.author = lines[0] if len(lines) > 0 else ""
                video.url = href
                video.publish_time = date.today().isoformat()

                if video.title:
                    videos.append(video)
            except Exception:
                continue

        return videos

    def _extract_video_detail(self, page, url: str) -> VideoData:
        """Extract details from a single video page."""
        video = VideoData(url=url, tags=[])
        try:
            page.wait_for_selector('[class*="desc"], [class*="detail"]', timeout=15000)
            time.sleep(self.delay)
        except Exception:
            pass

        try:
            desc_el = page.query_selector('[class*="desc"], [class*="caption"], [class*="title"]')
            if desc_el:
                text = desc_el.inner_text().strip()
                video.title = text[:100]
                video.description = text
                tags = re.findall(r"#(\S+)", text)
                video.tags = tags
        except Exception:
            pass

        try:
            author_el = page.query_selector('[class*="author"], [class*="nickname"]')
            if author_el:
                video.author = author_el.inner_text().strip()
        except Exception:
            pass

        video.publish_time = date.today().isoformat()
        return video

    def search(self, keyword: str, limit: int) -> list:
        if sync_playwright is None:
            print("⚠️  Playwright 未安装,请运行: pip install playwright && playwright install chromium", file=sys.stderr)
            return []

        with sync_playwright() as p:
            browser, page = self._launch_browser(p)
            try:
                search_url = f"https://www.douyin.com/search/{keyword}"
                print(f"🔍 搜索: {keyword}", file=sys.stderr)
                page.goto(search_url, wait_until="domcontentloaded", timeout=30000)
                videos = self._extract_search_results(page, keyword, limit)
                print(f"✅ 获取到 {len(videos)} 条结果", file=sys.stderr)
                return videos
            except Exception as e:
                print(f"⚠️  搜索出错: {e}", file=sys.stderr)
                return []
            finally:
                browser.close()

    def hot(self, category: str, limit: int) -> list:
        if sync_playwright is None:
            print("⚠️  Playwright 未安装", file=sys.stderr)
            return []

        with sync_playwright() as p:
            browser, page = self._launch_browser(p)
            try:
                url = "https://www.douyin.com/hot"
                if category:
                    url += f"/{category}"
                print(f"🔥 获取热榜", file=sys.stderr)
                page.goto(url, wait_until="domcontentloaded", timeout=30000)
                videos = self._extract_hot_results(page, limit)
                print(f"✅ 获取到 {len(videos)} 条热榜数据", file=sys.stderr)
                return videos
            except Exception as e:
                print(f"⚠️  获取热榜出错: {e}", file=sys.stderr)
                return []
            finally:
                browser.close()

    def analyze(self, url: str) -> VideoData:
        if sync_playwright is None:
            print("⚠️  Playwright 未安装", file=sys.stderr)
            return VideoData(url=url)

        with sync_playwright() as p:
            browser, page = self._launch_browser(p)
            try:
                print(f"🎬 分析视频: {url}", file=sys.stderr)
                page.goto(url, wait_until="domcontentloaded", timeout=30000)
                video = self._extract_video_detail(page, url)
                print(f"✅ 分析完成", file=sys.stderr)
                return video
            except Exception as e:
                print(f"⚠️  分析出错: {e}", file=sys.stderr)
                return VideoData(url=url)
            finally:
                browser.close()


def write_json(items, output: Path) -> None:
    data = [item.to_dict() if hasattr(item, "to_dict") else item for item in items]
    output.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")


def write_csv(items, output: Path) -> None:
    if not items:
        output.write_text("", encoding="utf-8")
        return
    dicts = [item.to_dict() if hasattr(item, "to_dict") else item for item in items]
    fieldnames = list(dicts[0].keys())
    with output.open("w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        for d in dicts:
            row = dict(d)
            row["tags"] = "|".join(row.get("tags", []))
            writer.writerow(row)


def main() -> None:
    parser = argparse.ArgumentParser(description="抖音爆款爬虫")
    sub = parser.add_subparsers(dest="command", required=True)

    search_p = sub.add_parser("search", help="搜索关键词")
    search_p.add_argument("--keyword", required=True, help="搜索关键词")
    search_p.add_argument("--limit", type=int, default=10, help="结果数量")
    search_p.add_argument("--output", help="输出文件路径")
    search_p.add_argument("--format", choices=["json", "csv"], default="json")

    hot_p = sub.add_parser("hot", help="获取热榜")
    hot_p.add_argument("--category", default="", help="分类")
    hot_p.add_argument("--limit", type=int, default=20, help="结果数量")
    hot_p.add_argument("--output", help="输出文件路径")
    hot_p.add_argument("--format", choices=["json", "csv"], default="json")

    analyze_p = sub.add_parser("analyze", help="分析视频链接")
    analyze_p.add_argument("--url", required=True, help="视频链接")
    analyze_p.add_argument("--output", help="输出文件路径")
    analyze_p.add_argument("--format", choices=["json", "csv"], default="json")

    args = parser.parse_args()
    scraper = DouyinScraper()

    if args.command == "search":
        items = scraper.search(args.keyword, args.limit)
        print(json.dumps([v.to_dict() for v in items], ensure_ascii=False, indent=2))
        if args.output and items:
            if args.format == "json":
                write_json(items, Path(args.output))
            else:
                write_csv(items, Path(args.output))

    elif args.command == "hot":
        items = scraper.hot(args.category, args.limit)
        print(json.dumps([v.to_dict() for v in items], ensure_ascii=False, indent=2))
        if args.output and items:
            if args.format == "json":
                write_json(items, Path(args.output))
            else:
                write_csv(items, Path(args.output))

    elif args.command == "analyze":
        video = scraper.analyze(args.url)
        print(json.dumps(video.to_dict(), ensure_ascii=False, indent=2))
        if args.output:
            if args.format == "json":
                write_json([video], Path(args.output))
            else:
                write_csv([video], Path(args.output))


if __name__ == "__main__":
    main()