文件预览

watcher.py

查看 Bilibili All In One 技能包中的文件内容。

文件内容

src/watcher.py

"""Bilibili video watching and stats tracking module."""

import asyncio
import time
from typing import Optional, Dict, Any, List

import httpx

from .auth import BilibiliAuth
from .utils import (
    API_VIDEO_INFO,
    API_VIDEO_DETAIL,
    DEFAULT_HEADERS,
    extract_bvid,
    format_number,
    format_duration,
    parse_video_url,
)


class BilibiliWatcher:
    """Watch and monitor Bilibili videos.

    Track view counts, comments, likes, and other engagement metrics over time.
    """

    def __init__(self, auth: Optional[BilibiliAuth] = None):
        """Initialize BilibiliWatcher.

        Args:
            auth: Optional BilibiliAuth instance for authenticated requests.
        """
        self.auth = auth
        self._tracking_data: Dict[str, List[Dict]] = {}

    def _get_client(self) -> httpx.AsyncClient:
        """Get an HTTP client, using auth if available."""
        if self.auth:
            return self.auth.get_client()
        return httpx.AsyncClient(
            headers=DEFAULT_HEADERS,
            timeout=30.0,
            follow_redirects=True,
        )

    async def watch(self, url: str) -> Dict[str, Any]:
        """Get detailed video information for watching.

        Args:
            url: Video URL (supports Bilibili).

        Returns:
            Detailed video information.
        """
        video_info = parse_video_url(url)

        if video_info["platform"] == "bilibili":
            return await self._watch_bilibili(video_info.get("bvid"), url)
        else:
            return {"success": False, "message": f"Unsupported platform for URL: {url}"}

    async def _watch_bilibili(self, bvid: Optional[str], url: str) -> Dict[str, Any]:
        """Get detailed Bilibili video information.

        Args:
            bvid: BV ID of the video.
            url: Original URL.

        Returns:
            Video details dict.
        """
        if not bvid:
            bvid = extract_bvid(url)
        if not bvid:
            return {"success": False, "message": f"Cannot extract BV ID from: {url}"}

        async with self._get_client() as client:
            resp = await client.get(API_VIDEO_DETAIL, params={"bvid": bvid})
            data = resp.json()

        if data.get("code") != 0:
            return {"success": False, "message": data.get("message", "API error")}

        video = data["data"].get("View", {})
        stat = video.get("stat", {})
        owner = video.get("owner", {})
        tags = [t.get("tag_name") for t in data["data"].get("Tags", []) if t.get("tag_name")]
        related = []
        for r in data["data"].get("Related", [])[:5]:
            related.append({
                "bvid": r.get("bvid"),
                "title": r.get("title"),
                "author": r.get("owner", {}).get("name"),
            })

        return {
            "success": True,
            "platform": "bilibili",
            "bvid": video.get("bvid"),
            "aid": video.get("aid"),
            "title": video.get("title"),
            "description": video.get("desc"),
            "cover": video.get("pic"),
            "duration": format_duration(video.get("duration", 0)),
            "author": {
                "mid": owner.get("mid"),
                "name": owner.get("name"),
                "face": owner.get("face"),
            },
            "stats": {
                "views": stat.get("view", 0),
                "views_formatted": format_number(stat.get("view", 0)),
                "danmaku": stat.get("danmaku", 0),
                "likes": stat.get("like", 0),
                "coins": stat.get("coin", 0),
                "favorites": stat.get("favorite", 0),
                "shares": stat.get("share", 0),
                "comments": stat.get("reply", 0),
            },
            "tags": tags,
            "related_videos": related,
            "url": f"https://www.bilibili.com/video/{video.get('bvid')}",
        }

    async def get_stats(self, url: str) -> Dict[str, Any]:
        """Get current engagement statistics for a video.

        Args:
            url: Video URL.

        Returns:
            Current engagement statistics.
        """
        bvid = extract_bvid(url)
        if not bvid:
            return {"success": False, "message": f"Invalid URL or BV number: {url}"}

        async with self._get_client() as client:
            resp = await client.get(API_VIDEO_INFO, params={"bvid": bvid})
            data = resp.json()

        if data.get("code") != 0:
            return {"success": False, "message": data.get("message", "API error")}

        stat = data["data"].get("stat", {})
        return {
            "success": True,
            "bvid": bvid,
            "title": data["data"].get("title"),
            "timestamp": int(time.time()),
            "stats": {
                "views": stat.get("view", 0),
                "danmaku": stat.get("danmaku", 0),
                "likes": stat.get("like", 0),
                "coins": stat.get("coin", 0),
                "favorites": stat.get("favorite", 0),
                "shares": stat.get("share", 0),
                "comments": stat.get("reply", 0),
            },
        }

    async def track(
        self,
        url: str,
        interval: int = 60,
        duration: int = 24,
        callback=None,
    ) -> Dict[str, Any]:
        """Track video metrics over time.

        Args:
            url: Video URL.
            interval: Tracking interval in minutes.
            duration: Total tracking duration in hours.
            callback: Optional async callback function called with each data point.

        Returns:
            Tracking summary with all collected data points.
        """
        bvid = extract_bvid(url)
        if not bvid:
            return {"success": False, "message": f"Invalid URL or BV number: {url}"}

        data_points = []
        end_time = time.time() + duration * 3600
        interval_seconds = interval * 60

        while time.time() < end_time:
            stats = await self.get_stats(url)
            if stats.get("success"):
                data_points.append(stats)
                if callback:
                    await callback(stats)

            self._tracking_data[bvid] = data_points

            remaining = end_time - time.time()
            if remaining <= 0:
                break
            await asyncio.sleep(min(interval_seconds, remaining))

        # Calculate changes
        summary = self._calculate_changes(data_points)

        return {
            "success": True,
            "bvid": bvid,
            "data_points": len(data_points),
            "duration_hours": duration,
            "interval_minutes": interval,
            "summary": summary,
            "data": data_points,
        }

    async def compare(self, urls: List[str]) -> Dict[str, Any]:
        """Compare engagement metrics of multiple videos.

        Args:
            urls: List of video URLs to compare.

        Returns:
            Comparison results.
        """
        results = []
        tasks = [self.get_stats(url) for url in urls]
        stats_list = await asyncio.gather(*tasks, return_exceptions=True)

        for url, stats in zip(urls, stats_list):
            if isinstance(stats, Exception):
                results.append({"url": url, "success": False, "message": str(stats)})
            else:
                results.append(stats)

        # Rank by views
        successful = [r for r in results if r.get("success")]
        successful.sort(key=lambda r: r.get("stats", {}).get("views", 0), reverse=True)

        return {
            "success": True,
            "total": len(urls),
            "compared": len(successful),
            "ranking": [
                {
                    "rank": i + 1,
                    "bvid": r.get("bvid"),
                    "title": r.get("title"),
                    "views": r.get("stats", {}).get("views", 0),
                    "likes": r.get("stats", {}).get("likes", 0),
                }
                for i, r in enumerate(successful)
            ],
            "results": results,
        }

    async def execute(self, action: str, **kwargs) -> Dict[str, Any]:
        """Execute a watcher action.

        Args:
            action: Action name ('watch', 'get_stats', 'track', 'compare').
            **kwargs: Additional parameters for the action.

        Returns:
            Action result dict.
        """
        actions = {
            "watch": self.watch,
            "get_stats": self.get_stats,
            "track": self.track,
            "compare": self.compare,
        }

        handler = actions.get(action)
        if not handler:
            return {"success": False, "message": f"Unknown action: {action}"}

        import inspect
        sig = inspect.signature(handler)
        valid_params = {k: v for k, v in kwargs.items() if k in sig.parameters}

        return await handler(**valid_params)

    @staticmethod
    def _calculate_changes(data_points: List[Dict]) -> Dict[str, Any]:
        """Calculate metric changes from data points.

        Args:
            data_points: List of stat snapshots.

        Returns:
            Summary of changes.
        """
        if len(data_points) < 2:
            return {"message": "Not enough data points for comparison"}

        first = data_points[0].get("stats", {})
        last = data_points[-1].get("stats", {})

        changes = {}
        for key in first:
            if isinstance(first.get(key), (int, float)):
                changes[key] = {
                    "start": first[key],
                    "end": last[key],
                    "change": last[key] - first[key],
                    "change_percent": round(
                        ((last[key] - first[key]) / first[key] * 100) if first[key] > 0 else 0, 2
                    ),
                }

        return changes