文件预览

cookie_manager.py

查看 Bilibili Video Transcriber 技能包中的文件内容。

文件内容

cookie_manager.py

#!/usr/bin/env python3
"""
B站 Cookie 安全管理模块

功能:
1. 多路径冗余存储 Cookie,防止误删除
2. 自动检测 Cookie 有效性
3. 扫码登录自动续签
4. 失效时生成通知并通过飞书/控制台发送二维码
"""

import os
import sys
import json
import time
import shutil
import logging
import requests
import tempfile
from pathlib import Path
from typing import Optional, Dict, Any, Callable

logger = logging.getLogger(__name__)

# Cookie 多路径冗余存储
COOKIE_STORE_PATHS = [
    # 1. 主存储(技能目录内)
    os.path.expanduser("~/.openclaw/workspace/skills/bilibili-video-transcriber/.bilibili_cookie"),
    # 2. 备用存储(用户主目录)
    os.path.expanduser("~/.bilibili_cookie_storage"),
    # 3. 系统级(/etc 下,需要 root 但尝试)
    os.path.expanduser("~/.config/bilibili_transcriber/cookie"),
]

# 活跃引用(供运行时读取)
COOKIE_ACTIVE_PATH = os.path.expanduser("~/.bilibili_cookie.txt")

# Cookie 检查环境变量通知通道
# 当 cookie_manager 需要通知用户时,会检查该文件中的回调配置
NOTIFY_CONFIG_PATH = os.path.expanduser(
    "~/.openclaw/workspace/skills/bilibili-video-transcriber/.cookie_notify_config.json"
)


def ensure_cookie_dirs():
    """确保所有存储路径的目录存在"""
    for p in [COOKIE_ACTIVE_PATH] + COOKIE_STORE_PATHS:
        d = os.path.dirname(p)
        os.makedirs(d, exist_ok=True)


def save_cookie(cookie_str: str) -> bool:
    """
    安全保存 Cookie 到多个冗余路径
    
    Args:
        cookie_str: Cookie 字符串
        
    Returns:
        是否至少保存到一个路径成功
    """
    ensure_cookie_dirs()
    
    # 1. 保存到活跃路径(主引用)
    success = False
    try:
        # 先写到临时文件再重命名,防止写入中断导致文件损坏
        fd, tmp_path = tempfile.mkstemp(dir=os.path.dirname(COOKIE_ACTIVE_PATH))
        os.close(fd)
        with open(tmp_path, 'w', encoding='utf-8') as f:
            f.write(cookie_str.strip())
        os.chmod(tmp_path, 0o600)  # 仅所有者可读写
        shutil.move(tmp_path, COOKIE_ACTIVE_PATH)
        logger.info(f"✅ Cookie 已保存到 {COOKIE_ACTIVE_PATH}")
        success = True
    except Exception as e:
        logger.warning(f"❌ 写入 {COOKIE_ACTIVE_PATH} 失败: {e}")
    
    # 2. 同步到所有冗余存储路径
    for store_path in COOKIE_STORE_PATHS:
        try:
            d = os.path.dirname(store_path)
            os.makedirs(d, exist_ok=True)
            fd, tmp_path = tempfile.mkstemp(dir=d)
            os.close(fd)
            with open(tmp_path, 'w', encoding='utf-8') as f:
                f.write(cookie_str.strip())
            os.chmod(tmp_path, 0o600)
            shutil.move(tmp_path, store_path)
            logger.info(f"✅ Cookie 已备份到 {store_path}")
            success = True
        except Exception as e:
            logger.warning(f"❌ 备份到 {store_path} 失败: {e}")
    
    return success


def load_cookie() -> Optional[str]:
    """
    从任意可用路径加载 Cookie(依次尝试所有路径)
    
    搜索顺序: 活跃文件 > 技能目录 > 用户目录 > 配置目录
    """
    search_paths = [COOKIE_ACTIVE_PATH] + COOKIE_STORE_PATHS
    
    for path in search_paths:
        expanded = os.path.expanduser(path)
        if os.path.exists(expanded):
            try:
                with open(expanded, 'r', encoding='utf-8') as f:
                    content = f.read().strip()
                if content:
                    # 如果是从备份路径读到的,自动补全到活跃路径
                    if path != COOKIE_ACTIVE_PATH and not os.path.exists(COOKIE_ACTIVE_PATH):
                        try:
                            os.makedirs(os.path.dirname(COOKIE_ACTIVE_PATH), exist_ok=True)
                            shutil.copy2(expanded, COOKIE_ACTIVE_PATH)
                            logger.info(f"♻️ 已从 {expanded} 恢复 Cookie 到 {COOKIE_ACTIVE_PATH}")
                        except Exception:
                            pass
                    return content
            except Exception as e:
                logger.debug(f"读取 {path} 失败: {e}")
    
    # 也检查环境变量
    env_cookie = os.environ.get('BILIBILI_COOKIE')
    if env_cookie:
        logger.info("📦 从环境变量加载 Cookie")
        return env_cookie
    
    return None


def parse_cookie(cookie_str: str) -> Dict[str, str]:
    """解析 Cookie 字符串为字典"""
    cookies = {}
    if not cookie_str:
        return cookies
    for item in cookie_str.split(';'):
        item = item.strip()
        if '=' in item:
            k, v = item.split('=', 1)
            cookies[k.strip()] = v.strip()
    return cookies


def check_cookie_valid(cookie_str: str) -> Dict[str, Any]:
    """
    检查 Cookie 是否有效
    
    Returns:
        {
            "valid": bool,
            "username": str or None,
            "is_vip": bool,
            "error": str or None
        }
    """
    if not cookie_str:
        return {"valid": False, "username": None, "is_vip": False, "error": "cookie为空"}
    
    try:
        from bilibili_api import Credential, sync
        from bilibili_api import user as bilibili_user
        
        cookies = parse_cookie(cookie_str)
        
        credential = Credential(
            sessdata=cookies.get('SESSDATA', ''),
            bili_jct=cookies.get('bili_jct', ''),
            buvid3=cookies.get('buvid3', '')
        )
        
        # User 需要 uid 参数,但有个简化的方式
        # 直接调用 get_self_info 或者使用已弃用的方式
        try:
            from bilibili_api import user as bilibili_user
            u = bilibili_user.User(credential=credential)
            info = sync(u.get_user_info())
            return {
                "valid": True,
                "username": info.get('name', 'Unknown'),
                "is_vip": info.get('vipStatus', 0) == 1,
                "error": None
            }
        except TypeError as te:
            # 新版 API 可能需要 uid
            uid = cookies.get('DedeUserID', '0')
            if uid and uid != '0':
                u = bilibili_user.User(uid=int(uid), credential=credential)
                info = sync(u.get_user_info())
                return {
                    "valid": True,
                    "username": info.get('name', 'Unknown'),
                    "is_vip": info.get('vipStatus', 0) == 1,
                    "error": None
                }
            raise
    except Exception as e:
        err_msg = str(e)
        # 常见 Cookie 失效错误
        if "412" in err_msg or "Precondition" in err_msg:
            err_msg = "Cookie 可能已过期或被风控"
        elif "401" in err_msg or "Unauthorized" in err_msg:
            err_msg = "Cookie 已失效"
        return {"valid": False, "username": None, "is_vip": False, "error": err_msg}


class BilibiliQRLogin:
    """B站扫码登录 - 支持通过飞书发送二维码"""

    def __init__(self):
        self.qrcode_key = None
        self.qrcode_url = None
        self.qr_image_path = None

    def generate_qr_code(self) -> str:
        """
        生成二维码
        
        Returns:
            二维码图片路径
        """
        import qrcode

        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
        }

        r = requests.get(
            "https://passport.bilibili.com/x/passport-login/web/qrcode/generate",
            headers=headers,
            timeout=15
        )
        data = r.json()

        if data.get('code') != 0:
            raise Exception(f"生成二维码失败: {data.get('message', '未知错误')}")

        self.qrcode_key = data['data']['qrcode_key']
        self.qrcode_url = data['data']['url']

        # 生成二维码图片
        qr = qrcode.QRCode(box_size=8, border=3)
        qr.add_data(self.qrcode_url)
        qr.make(fit=True)
        img = qr.make_image(fill_color="black", back_color="white")

        qr_path = "/tmp/bilibili_qr_login.png"
        img.save(qr_path)
        self.qr_image_path = qr_path

        logger.info(f"🔑 二维码已生成: {qr_path}")
        logger.info(f"🔗 URL: {self.qrcode_url}")
        return qr_path

    def poll_login(self, timeout_seconds: int = 180, callback: Optional[Callable] = None) -> Dict[str, Any]:
        """
        轮询登录状态,有结果时调用回调
        
        Args:
            timeout_seconds: 超时时间(秒)
            callback: 结果回调函数,会在登录成功时被调用
            
        Returns:
            {"success": bool, "cookie": str, "username": str, ...}
        """
        if not self.qrcode_key:
            return {"success": False, "error": "未生成二维码"}

        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
        }

        poll_url = "https://passport.bilibili.com/x/passport-login/web/qrcode/poll"
        
        start_time = time.time()
        while time.time() - start_time < timeout_seconds:
            try:
                r = requests.post(
                    poll_url,
                    params={'qrcode_key': self.qrcode_key},
                    headers=headers,
                    timeout=10
                )
                result = r.json()
                
                code = result['data']['code']
                
                if code == 0:
                    # 登录成功,从 Set-Cookie 提取
                    all_cookies = {}
                    # 从 headers 提取
                    for k, v in r.headers.items():
                        if k.lower() == 'set-cookie':
                            parts = v.split(';')
                            for part in parts:
                                if '=' in part:
                                    ck, cv = part.split('=', 1)
                                    all_cookies[ck.strip()] = cv.strip()
                    # 从 cookies 对象提取
                    for k, v in r.cookies.items():
                        all_cookies[k] = v
                    
                    # 提取关键 cookie
                    cookie_parts = []
                    for ck in ['SESSDATA', 'bili_jct', 'buvid3', 'DedeUserID', 'DedeUserID__ckMd5', 'sid']:
                        if ck in all_cookies:
                            cookie_parts.append(f"{ck}={all_cookies[ck]}")
                    
                    if not cookie_parts:
                        # fallback: 从 data 中获取
                        data = result.get('data', {})
                        if 'url' in data:
                            import urllib.parse
                            parsed = urllib.parse.urlparse(data['url'])
                            params = urllib.parse.parse_qs(parsed.query)
                            for ck in ['SESSDATA', 'bili_jct', 'DedeUserID']:
                                if ck in params:
                                    cookie_parts.append(f"{ck}={params[ck][0]}")
                    
                    cookie_str = '; '.join(cookie_parts) if cookie_parts else ""
                    result_data = {"success": bool(cookie_str), "cookie": cookie_str}
                    
                    # 验证
                    if cookie_str:
                        check = check_cookie_valid(cookie_str)
                        result_data["username"] = check.get("username")
                        result_data["is_vip"] = check.get("is_vip")
                        result_data["valid"] = check["valid"]
                        
                        if check["valid"]:
                            # 自动保存
                            save_cookie(cookie_str)
                    
                    if callback:
                        callback(result_data)
                    
                    return result_data
                    
                elif code == 86038:
                    return {"success": False, "error": "二维码已过期"}
                elif code == 86090:
                    # 已扫码但未确认
                    pass
                    
            except Exception as e:
                logger.debug(f"轮询异常: {e}")
            
            time.sleep(2)
        
        return {"success": False, "error": "扫码超时"}
    
    def poll_once(self) -> Dict[str, Any]:
        """
        只轮询一次,不循环
        
        Returns:
            {"success": bool, ...} 或 {"polling": True}
        """
        if not self.qrcode_key:
            return {"success": False, "error": "未生成二维码"}
        
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
        }
        
        try:
            r = requests.post(
                "https://passport.bilibili.com/x/passport-login/web/qrcode/poll",
                params={'qrcode_key': self.qrcode_key},
                headers=headers,
                timeout=10
            )
            result = r.json()
            code = result['data']['code']
            
            if code == 0:
                # 登录成功
                all_cookies = {}
                for k, v in r.headers.items():
                    if k.lower() == 'set-cookie':
                        parts = v.split(';')
                        for part in parts:
                            if '=' in part:
                                ck, cv = part.split('=', 1)
                                all_cookies[ck.strip()] = cv.strip()
                for k, v in r.cookies.items():
                    all_cookies[k] = v
                
                cookie_parts = []
                for ck in ['SESSDATA', 'bili_jct', 'buvid3', 'DedeUserID']:
                    if ck in all_cookies:
                        cookie_parts.append(f"{ck}={all_cookies[ck]}")
                
                cookie_str = '; '.join(cookie_parts)
                if cookie_str:
                    save_cookie(cookie_str)
                    check = check_cookie_valid(cookie_str)
                    return {
                        "success": True,
                        "cookie": cookie_str,
                        "username": check.get("username"),
                        "is_vip": check.get("is_vip"),
                    }
                
            elif code == 86038:
                return {"success": False, "error": "二维码已过期"}
            
            return {"polling": True, "code": code}
            
        except Exception as e:
            return {"success": False, "error": str(e)}


def ensure_cookie(force_login: bool = False) -> bool:
    """
    确保 Cookie 可用,如不可用则提示用户
    
    Args:
        force_login: 是否强制重新登录
        
    Returns:
        Cookie 是否可用
    """
    # 1. 尝试加载现存 Cookie
    cookie_str = load_cookie()
    
    if cookie_str and not force_login:
        # 2. 验证有效性
        result = check_cookie_valid(cookie_str)
        if result["valid"]:
            logger.info(f"✅ Cookie 有效 (用户: {result['username']})")
            return True
        else:
            logger.warning(f"⚠️ Cookie 已失效: {result['error']}")
    
    # 3. Cookie 无效或强制登录,生成二维码
    logger.info("📱 请扫码登录 B 站...")
    return False


def needs_cookie_refresh() -> Optional[str]:
    """
    检查是否需要刷新 Cookie
    
    Returns:
        None = Cookie 有效,str = 过期原因
    """
    cookie_str = load_cookie()
    if not cookie_str:
        return "Cookie 文件不存在"
    
    result = check_cookie_valid(cookie_str)
    if not result["valid"]:
        return result.get("error", "Cookie 已失效")
    
    return None


def get_stored_cookie() -> Optional[str]:
    """获取已存储的 Cookie(带有效性检查和自动恢复)"""
    return load_cookie()


# ========== 飞书发送二维码支持 ==========
# 当 cookie_manager 需要在飞书上发送二维码时,外部调用此函数

def send_qr_via_feishu(qr_image_path: str) -> bool:
    """
    通过飞书发送二维码图片
    
    返回值: 是否成功发送
    注意:此函数需要外部提供 feishu 发送能力,这里用文件标记方式
    """
    # 签名文件 - 代理处理器会检查此文件并执行实际的发送
    signal_path = "/tmp/.bilibili_qr_send_signal.json"
    signal = {
        "action": "send_qr",
        "image_path": qr_image_path,
        "timestamp": time.time(),
        "message": "请使用B站APP扫描此二维码登录,扫码后请回复「已扫码」"
    }
    try:
        with open(signal_path, 'w') as f:
            json.dump(signal, f)
        logger.info(f"📨 二维码发送信号已写入: {signal_path}")
        return True
    except Exception as e:
        logger.error(f"写入发送信号失败: {e}")
        return False


# 初始化
ensure_cookie_dirs()