文件预览

collect_browser.py

查看 Social Media Data Collector 技能包中的文件内容。

文件内容

scripts/collect_browser.py

#!/usr/bin/env python3
"""
Browser-tier data collection using Playwright headless.
For platforms without API access: 百家号, 汽车之家, 易车, 视频号, 斗鱼, 皮皮虾, 懂车帝.

Usage:
    python3 collect_browser.py --config config.json
    python3 collect_browser.py --platform 百家号 --url "https://mbd.baidu.com/..."

Config format:
{
    "platforms": {
        "百家号": {"url": "https://mbd.baidu.com/...", "record_id": "recXXX"},
        ...
    }
}
"""

import json
import sys
import os
import re
import time
import argparse


def extract_baijiahao(page):
    """百家号: inner_text → number before 收藏 = 点赞; 播放 in X万次播放; 评论 in 评论列表(N条)"""
    text = page.inner_text("body")
    lines = [l.strip() for l in text.split('\n') if l.strip()]
    metrics = {}
    
    for i, line in enumerate(lines):
        if line == "收藏" and i > 0:
            try:
                metrics["likes"] = int(lines[i-1])
            except ValueError:
                pass
        if "万次播放" in line:
            m = re.search(r'([\d.]+)万次播放', line)
            if m:
                metrics["plays"] = f"{m.group(1)}万"
        if "评论列表" in line:
            m = re.search(r'评论列表[((](\d+)条[))]', line)
            if m:
                metrics["comments"] = int(m.group(1))
    
    return metrics


def extract_autohome(page):
    """汽车之家: SVG sprite buttons in tw-mt-4 tw-flex container"""
    # Find interaction buttons via DOM
    result = page.evaluate("""() => {
        const spans = document.querySelectorAll('span');
        for (const span of spans) {
            const parent = span.closest('[class*="tw-mt-4"][class*="tw-flex"]');
            if (parent) {
                const children = parent.children;
                const items = [];
                for (const child of children) {
                    const svgs = child.querySelectorAll('svg use, img');
                    let type = 'unknown';
                    for (const svg of svgs) {
                        const href = svg.getAttribute('href') || svg.getAttribute('xlink:href') || svg.getAttribute('src') || '';
                        if (href.includes('like')) type = 'likes';
                        else if (href.includes('comment')) type = 'comments';
                        else if (href.includes('collect')) type = 'collects';
                        else if (href.includes('share')) type = 'shares';
                    }
                    items.push({type, text: child.innerText.trim()});
                }
                return items;
            }
        }
        return [];
    }""")
    
    metrics = {}
    for item in result:
        if item["text"] and item["type"] != "unknown":
            try:
                metrics[item["type"]] = int(item["text"])
            except ValueError:
                pass
    
    # Play count from text
    text = page.inner_text("body")
    m = re.search(r'([\d.]+万?)播放', text)
    if m:
        metrics["plays"] = m.group(1)
    
    return metrics


def extract_yiche(page):
    """易车: sv.yiche.com - number before 写评论 = 点赞"""
    text = page.inner_text("body")
    lines = [l.strip() for l in text.split('\n') if l.strip()]
    metrics = {}
    
    for i, line in enumerate(lines):
        if line == "写评论" and i > 0:
            try:
                metrics["likes"] = int(lines[i-1])
            except ValueError:
                pass
            break
    
    return metrics


def extract_wechat_video(page):
    """视频号: weixin.qq.com/sph/ - sequential numbers after title"""
    text = page.inner_text("body")
    lines = [l.strip() for l in text.split('\n') if l.strip()]
    metrics = {}
    
    # Find "可扫码" marker and work backwards
    for i, line in enumerate(lines):
        if "可扫码" in line and i >= 4:
            try:
                metrics["collects"] = int(lines[i-1])
                metrics["shares"] = int(lines[i-2])
                metrics["comments"] = int(lines[i-3])
                metrics["likes"] = int(lines[i-4])
            except (ValueError, IndexError):
                pass
            break
    
    return metrics


def extract_douyu(page):
    """斗鱼: play_count in HTML script JSON"""
    html = page.content()
    metrics = {}
    
    m = re.search(r'"play_count"\s*:\s*(\d+)', html)
    if m:
        count = int(m.group(1))
        if count > 10000:
            metrics["plays"] = f"{count/10000:.1f}万"
        else:
            metrics["plays"] = str(count)
    
    return metrics


def extract_pipixia(page):
    """皮皮虾: JSON in script tags"""
    html = page.content()
    metrics = {}
    
    for key, field in [("digg_count", "likes"), ("comment_count", "comments"), 
                        ("share_count", "shares")]:
        m = re.search(rf'"{key}"\s*:\s*(\d+)', html)
        if m:
            metrics[field] = int(m.group(1))
    
    return metrics


def extract_dongchedi(page):
    """懂车帝: __INITIAL_STATE__ JSON in script"""
    html = page.content()
    metrics = {}
    
    m = re.search(r'window\.__INITIAL_STATE__\s*=\s*({.+?})\s*;', html, re.DOTALL)
    if m:
        try:
            data = json.loads(m.group(1))
            # Navigate to article stats
            for key, field in [("digg_count", "likes"), ("comment_count", "comments"),
                                ("share_count", "shares"), ("read_count", "plays")]:
                val = _find_in_nested(data, key)
                if val is not None:
                    metrics[field] = val
        except json.JSONDecodeError:
            pass
    
    return metrics


def _find_in_nested(data, key, max_depth=5):
    """Find a key in arbitrarily nested dict/list."""
    if max_depth <= 0:
        return None
    if isinstance(data, dict):
        if key in data:
            return data[key]
        for v in data.values():
            result = _find_in_nested(v, key, max_depth - 1)
            if result is not None:
                return result
    elif isinstance(data, list):
        for item in data[:10]:  # Limit list traversal
            result = _find_in_nested(item, key, max_depth - 1)
            if result is not None:
                return result
    return None


EXTRACTORS = {
    "百家号": extract_baijiahao,
    "汽车之家": extract_autohome,
    "易车": extract_yiche,
    "视频号": extract_wechat_video,
    "斗鱼": extract_douyu,
    "皮皮虾": extract_pipixia,
    "懂车帝": extract_dongchedi,
}


def collect_platform(context, platform, url):
    """Open page and extract metrics."""
    extractor = EXTRACTORS.get(platform)
    if not extractor:
        return {"error": f"No extractor for {platform}"}
    
    page = context.new_page()
    try:
        page.goto(url, timeout=25000, wait_until="networkidle")
        time.sleep(3)  # Let dynamic content settle
        metrics = extractor(page)
        return {"metrics": metrics}
    except Exception as e:
        return {"error": str(e)}
    finally:
        page.close()


def main():
    parser = argparse.ArgumentParser(description="Collect social media metrics via browser")
    parser.add_argument("--config", help="Config JSON file")
    parser.add_argument("--platform", help="Single platform name")
    parser.add_argument("--url", help="Single URL to scrape")
    parser.add_argument("--output", default="/tmp/sm-collect/browser-results.json")
    args = parser.parse_args()

    os.makedirs(os.path.dirname(args.output), exist_ok=True)

    # Import playwright
    try:
        from playwright.sync_api import sync_playwright
    except ImportError:
        print("ERROR: playwright not installed. Run: pip install playwright && playwright install chromium")
        sys.exit(1)

    # Build platform list
    platforms = {}
    if args.config:
        with open(args.config) as f:
            config = json.load(f)
        platforms = config.get("platforms", {})
    elif args.platform and args.url:
        platforms = {args.platform: {"url": args.url}}
    else:
        print("ERROR: Provide --config or --platform + --url")
        sys.exit(1)

    results = []
    
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        context = browser.new_context(
            viewport={"width": 1280, "height": 900},
            user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
            locale="zh-CN"
        )
        
        for platform, info in platforms.items():
            url = info["url"] if isinstance(info, dict) else info
            record_id = info.get("record_id") if isinstance(info, dict) else None
            
            print(f"[{platform}] Scraping {url[:60]}...", end=" ", flush=True)
            result = collect_platform(context, platform, url)
            result["platform"] = platform
            result["record_id"] = record_id
            results.append(result)
            
            if "error" in result:
                print(f"❌ {result['error']}")
            else:
                print(f"✅ {result['metrics']}")
        
        browser.close()

    with open(args.output, "w") as f:
        json.dump(results, f, ensure_ascii=False, indent=2)
    print(f"\nResults saved to {args.output}")


if __name__ == "__main__":
    main()