文件预览

downloader.py

查看 Bilibili All In One 技能包中的文件内容。

文件内容

src/downloader.py

"""Bilibili video downloading module."""

import os
import shutil
import asyncio
from typing import Optional, Dict, Any, List

import httpx

from .auth import BilibiliAuth
from .utils import (
    API_VIDEO_INFO,
    API_PLAY_URL,
    QUALITY_MAP,
    DEFAULT_HEADERS,
    extract_bvid,
    format_duration,
    format_number,
    ensure_dir,
    sanitize_filename,
)


# -----------------------------------------------------------------------------
# Security boundary for the one subprocess call in this module (`ffmpeg`):
#
#   * We NEVER call `shell=True`, `os.system`, or `os.popen`.
#   * We resolve the ffmpeg binary once via `shutil.which("ffmpeg")` so that
#     the executable is a full absolute path (no PATH-injection surface).
#   * Every argument passed to ffmpeg in `_merge_streams` is a path that was
#     generated by *this process* from sanitized components (sanitize_filename
#     applied to `video.title` and `pages[i].part`) plus constant suffixes
#     like `.video.tmp` / `.audio.tmp`. No user-supplied string, URL, cookie
#     or remote response is ever concatenated into the argv.
#   * The subprocess is spawned with a minimal environment and its stdout /
#     stderr are discarded, so it cannot be coerced into emitting or reading
#     sensitive data.
# -----------------------------------------------------------------------------


def _resolve_ffmpeg() -> Optional[str]:
    """Locate the `ffmpeg` binary on the host.

    Returns the absolute path, or None if ffmpeg is not installed. The result
    is used by `_merge_streams` — passing a resolved absolute path to
    `create_subprocess_exec` avoids any PATH lookup at the execve() boundary.
    """
    path = shutil.which("ffmpeg")
    return os.path.abspath(path) if path else None


class BilibiliDownloader:
    """Download videos from Bilibili.

    Supports multiple quality options, format selection, and batch downloading.
    """

    def __init__(self, auth: Optional[BilibiliAuth] = None, output_dir: str = "./downloads"):
        """Initialize BilibiliDownloader.

        Args:
            auth: Optional BilibiliAuth instance for authenticated requests.
            output_dir: Default output directory for downloaded files.
        """
        self.auth = auth
        self.output_dir = output_dir

    def _get_client(self) -> httpx.AsyncClient:
        """Get an HTTP client, using auth if available."""
        if self.auth:
            return self.auth.get_client()
        return httpx.AsyncClient(
            headers=DEFAULT_HEADERS,
            timeout=60.0,
            follow_redirects=True,
        )

    async def get_info(self, url: str) -> Dict[str, Any]:
        """Get video information.

        Args:
            url: Bilibili video URL or BV number.

        Returns:
            Video information dict.
        """
        bvid = extract_bvid(url)
        if not bvid:
            return {"success": False, "message": f"Invalid URL or BV number: {url}"}

        async with self._get_client() as client:
            resp = await client.get(API_VIDEO_INFO, params={"bvid": bvid})
            data = resp.json()

        if data.get("code") != 0:
            return {"success": False, "message": data.get("message", "API error")}

        video = data["data"]
        stat = video.get("stat", {})
        owner = video.get("owner", {})

        pages = []
        for p in video.get("pages", []):
            pages.append({
                "page": p.get("page"),
                "cid": p.get("cid"),
                "title": p.get("part"),
                "duration": format_duration(p.get("duration", 0)),
            })

        return {
            "success": True,
            "bvid": video.get("bvid"),
            "aid": video.get("aid"),
            "title": video.get("title"),
            "description": video.get("desc"),
            "cover": video.get("pic"),
            "duration": format_duration(video.get("duration", 0)),
            "author": {
                "mid": owner.get("mid"),
                "name": owner.get("name"),
            },
            "stats": {
                "views": format_number(stat.get("view", 0)),
                "likes": format_number(stat.get("like", 0)),
                "coins": format_number(stat.get("coin", 0)),
                "favorites": format_number(stat.get("favorite", 0)),
                "danmaku": stat.get("danmaku", 0),
            },
            "pages": pages,
            "url": f"https://www.bilibili.com/video/{video.get('bvid')}",
        }

    async def get_formats(self, url: str) -> Dict[str, Any]:
        """Get available download formats and qualities for a video.

        Args:
            url: Bilibili video URL or BV number.

        Returns:
            Available formats and qualities.
        """
        bvid = extract_bvid(url)
        if not bvid:
            return {"success": False, "message": f"Invalid URL or BV number: {url}"}

        # First get video info to get cid
        async with self._get_client() as client:
            resp = await client.get(API_VIDEO_INFO, params={"bvid": bvid})
            data = resp.json()

        if data.get("code") != 0:
            return {"success": False, "message": data.get("message", "API error")}

        cid = data["data"]["pages"][0]["cid"]

        # Get play URL to see available qualities
        async with self._get_client() as client:
            resp = await client.get(
                API_PLAY_URL,
                params={
                    "bvid": bvid,
                    "cid": cid,
                    "fnval": 4048,
                    "fourk": 1,
                },
            )
            play_data = resp.json()

        if play_data.get("code") != 0:
            return {"success": False, "message": play_data.get("message", "API error")}

        dash = play_data.get("data", {})
        quality_names = {v: k for k, v in QUALITY_MAP.items()}

        available_qualities = []
        for qn in dash.get("accept_quality", []):
            name = quality_names.get(qn, f"qn_{qn}")
            available_qualities.append({
                "quality": name,
                "qn": qn,
            })

        return {
            "success": True,
            "bvid": bvid,
            "available_qualities": available_qualities,
            "formats": ["mp4", "flv", "mp3"],
        }

    async def download(
        self,
        url: str,
        quality: str = "1080p",
        output_dir: Optional[str] = None,
        format: str = "mp4",
        page: int = 1,
    ) -> Dict[str, Any]:
        """Download a single video.

        Args:
            url: Bilibili video URL or BV number.
            quality: Desired quality ('360p', '480p', '720p', '1080p', '1080p+', '4k').
            output_dir: Output directory (uses default if not specified).
            format: Output format ('mp4', 'flv', 'mp3').
            page: Page/episode number for multi-part videos.

        Returns:
            Download result dict with file path.
        """
        bvid = extract_bvid(url)
        if not bvid:
            return {"success": False, "message": f"Invalid URL or BV number: {url}"}

        out_dir = ensure_dir(output_dir or self.output_dir)
        qn = QUALITY_MAP.get(quality, 80)

        # Get video info
        async with self._get_client() as client:
            resp = await client.get(API_VIDEO_INFO, params={"bvid": bvid})
            data = resp.json()

        if data.get("code") != 0:
            return {"success": False, "message": data.get("message", "API error")}

        video = data["data"]
        title = sanitize_filename(video.get("title", bvid))

        pages = video.get("pages", [])
        if page > len(pages):
            return {"success": False, "message": f"Page {page} not found, video has {len(pages)} pages"}

        cid = pages[page - 1]["cid"]
        page_title = pages[page - 1].get("part", "")

        if len(pages) > 1 and page_title:
            filename = f"{title}_P{page}_{sanitize_filename(page_title)}.{format}"
        else:
            filename = f"{title}.{format}"

        filepath = os.path.join(out_dir, filename)

        # Get download URL
        async with self._get_client() as client:
            resp = await client.get(
                API_PLAY_URL,
                params={
                    "bvid": bvid,
                    "cid": cid,
                    "qn": qn,
                    "fnval": 4048 if format != "flv" else 0,
                    "fourk": 1,
                },
            )
            play_data = resp.json()

        if play_data.get("code") != 0:
            return {"success": False, "message": play_data.get("message", "API error")}

        # Extract download URLs from DASH or legacy format
        dash_data = play_data.get("data", {}).get("dash")
        if dash_data and format != "flv":
            video_url = self._select_dash_stream(dash_data.get("video", []), qn)
            audio_url = self._select_dash_audio(dash_data.get("audio", []))

            if not video_url:
                return {"success": False, "message": "No suitable video stream found"}

            if format == "mp3":
                # Audio only
                if not audio_url:
                    return {"success": False, "message": "No audio stream found"}
                filepath = filepath.replace(f".{format}", ".mp3")
                await self._download_stream(audio_url, filepath)
            else:
                # Download video and audio separately, then combine
                video_tmp = filepath + ".video.tmp"
                audio_tmp = filepath + ".audio.tmp"

                await asyncio.gather(
                    self._download_stream(video_url, video_tmp),
                    self._download_stream(audio_url, audio_tmp) if audio_url else asyncio.sleep(0),
                )

                if audio_url and os.path.exists(audio_tmp):
                    # Merge video and audio (requires ffmpeg)
                    merge_result = await self._merge_streams(video_tmp, audio_tmp, filepath)
                    # Clean up temp files
                    for tmp in [video_tmp, audio_tmp]:
                        if os.path.exists(tmp):
                            os.remove(tmp)
                    if not merge_result:
                        # Fallback: rename video file
                        os.rename(video_tmp, filepath)
                else:
                    os.rename(video_tmp, filepath)
        else:
            # Legacy FLV format
            durl = play_data.get("data", {}).get("durl", [])
            if not durl:
                return {"success": False, "message": "No download URL found"}
            await self._download_stream(durl[0]["url"], filepath)

        file_size = os.path.getsize(filepath) if os.path.exists(filepath) else 0
        return {
            "success": True,
            "bvid": bvid,
            "title": video.get("title"),
            "quality": quality,
            "format": format,
            "filepath": filepath,
            "file_size": file_size,
            "file_size_mb": round(file_size / (1024 * 1024), 2),
        }

    async def batch_download(
        self,
        urls: List[str],
        quality: str = "1080p",
        output_dir: Optional[str] = None,
        format: str = "mp4",
    ) -> Dict[str, Any]:
        """Download multiple videos.

        Args:
            urls: List of Bilibili video URLs or BV numbers.
            quality: Desired quality.
            output_dir: Output directory.
            format: Output format.

        Returns:
            Batch download results.
        """
        results = []
        for url in urls:
            result = await self.download(
                url=url,
                quality=quality,
                output_dir=output_dir,
                format=format,
            )
            results.append(result)

        succeeded = sum(1 for r in results if r.get("success"))
        return {
            "success": True,
            "total": len(urls),
            "succeeded": succeeded,
            "failed": len(urls) - succeeded,
            "results": results,
        }

    async def execute(self, action: str, **kwargs) -> Dict[str, Any]:
        """Execute a downloader action.

        Args:
            action: Action name ('download', 'get_info', 'get_formats', 'batch_download').
            **kwargs: Additional parameters for the action.

        Returns:
            Action result dict.
        """
        actions = {
            "download": self.download,
            "get_info": self.get_info,
            "get_formats": self.get_formats,
            "batch_download": self.batch_download,
        }

        handler = actions.get(action)
        if not handler:
            return {"success": False, "message": f"Unknown action: {action}"}

        import inspect
        sig = inspect.signature(handler)
        valid_params = {k: v for k, v in kwargs.items() if k in sig.parameters}

        return await handler(**valid_params)

    async def _download_stream(self, url: str, filepath: str) -> None:
        """Download a stream URL to a file.

        Args:
            url: Stream URL to download.
            filepath: Destination file path.
        """
        headers = DEFAULT_HEADERS.copy()
        headers["Referer"] = "https://www.bilibili.com"

        async with httpx.AsyncClient(
            headers=headers,
            timeout=300.0,
            follow_redirects=True,
        ) as client:
            async with client.stream("GET", url) as resp:
                with open(filepath, "wb") as f:
                    async for chunk in resp.aiter_bytes(chunk_size=1024 * 1024):
                        f.write(chunk)

    @staticmethod
    async def _merge_streams(video_path: str, audio_path: str, output_path: str) -> bool:
        """Merge two locally-downloaded media files with ffmpeg.

        Security contract (see module-level comment at the top of this file):
          * The three paths MUST be strings that this process has just written
            to — they are produced from `sanitize_filename()` + constant
            suffixes, never from raw user input or remote responses.
          * `shell=True` is never used; arguments are passed as a fixed
            positional argv list to `asyncio.create_subprocess_exec`, which
            invokes execve() directly without an intermediate shell.
          * The ffmpeg binary is resolved to an absolute path via
            `shutil.which` to neutralize PATH-injection attacks.
          * The child is spawned with a minimal environment (PATH only) and
            with stdout/stderr suppressed.
          * A hard timeout caps any runaway ffmpeg invocation.

        Args:
            video_path: Path to the downloaded video-only stream (local file).
            audio_path: Path to the downloaded audio-only stream (local file).
            output_path: Path to write the muxed output file.

        Returns:
            True if merge succeeded, False otherwise (missing ffmpeg, non-zero
            exit, timeout, or any path validation failure).
        """
        # Defensive type check: refuse anything that isn't a plain string.
        # This prevents accidental PathLike objects from smuggling in
        # os.fsencode()-able non-string payloads.
        for p in (video_path, audio_path, output_path):
            if not isinstance(p, str):
                return False
            # Reject null bytes — execve() would reject them anyway, but we
            # fail fast with a clear signal.
            if "\x00" in p:
                return False

        ffmpeg = _resolve_ffmpeg()
        if ffmpeg is None:
            return False

        # Normalize to absolute paths so ffmpeg cannot be tricked by a
        # relative-path + cwd combination.
        video_abs = os.path.abspath(video_path)
        audio_abs = os.path.abspath(audio_path)
        output_abs = os.path.abspath(output_path)

        # Minimal environment: ffmpeg only needs PATH to locate its own
        # codec plugins on some distros. Everything else (HOME, LD_*, etc.)
        # is withheld.
        safe_env = {"PATH": os.environ.get("PATH", "/usr/bin:/bin")}

        try:
            proc = await asyncio.create_subprocess_exec(
                ffmpeg,
                "-y",                      # overwrite output without prompt
                "-nostdin",                # never read from stdin
                "-loglevel", "error",
                "-i", video_abs,
                "-i", audio_abs,
                "-c", "copy",
                output_abs,
                stdin=asyncio.subprocess.DEVNULL,
                stdout=asyncio.subprocess.DEVNULL,
                stderr=asyncio.subprocess.DEVNULL,
                env=safe_env,
            )
        except (FileNotFoundError, OSError):
            return False

        try:
            # Hard cap at 30 minutes to contain a runaway / hung process.
            await asyncio.wait_for(proc.wait(), timeout=30 * 60)
        except asyncio.TimeoutError:
            try:
                proc.kill()
            except ProcessLookupError:
                pass
            return False

        return proc.returncode == 0

    @staticmethod
    def _select_dash_stream(streams: List[Dict], target_qn: int) -> Optional[str]:
        """Select the best matching DASH video stream.

        Args:
            streams: List of available video streams.
            target_qn: Target quality number.

        Returns:
            Stream URL or None.
        """
        if not streams:
            return None

        # Sort by quality (descending) and find best match
        sorted_streams = sorted(streams, key=lambda s: s.get("id", 0), reverse=True)

        # Try exact match first
        for s in sorted_streams:
            if s.get("id") == target_qn:
                return s.get("baseUrl") or s.get("base_url")

        # Fall back to the best available that doesn't exceed target
        for s in sorted_streams:
            if s.get("id", 0) <= target_qn:
                return s.get("baseUrl") or s.get("base_url")

        # If nothing below target, return lowest available
        return sorted_streams[-1].get("baseUrl") or sorted_streams[-1].get("base_url")

    @staticmethod
    def _select_dash_audio(streams: List[Dict]) -> Optional[str]:
        """Select the best DASH audio stream.

        Args:
            streams: List of available audio streams.

        Returns:
            Stream URL or None.
        """
        if not streams:
            return None

        # Sort by bandwidth (descending) and pick the best
        sorted_streams = sorted(streams, key=lambda s: s.get("bandwidth", 0), reverse=True)
        return sorted_streams[0].get("baseUrl") or sorted_streams[0].get("base_url")