文件预览

mps_av_understand.py

查看 Tencent MPS 技能包中的文件内容。

文件内容

scripts/mps_av_understand.py

#!/usr/bin/env python3
"""
腾讯云 MPS 大模型音视频理解脚本

功能:
  调用 MPS 大模型音视频理解能力(AiAnalysisTask + VideoComprehension),
  通过提示词控制理解侧重点,实现对视频/音频的内容分析、场景识别、摘要生成等。

  API 文档:https://cloud.tencent.com/document/product/862/126094
  接口说明:ProcessMedia — AiAnalysisTask.Definition=33,ExtendedParameter 传 mvc 参数

核心参数说明:
  --mode   : "video"(理解视频画面+音频)或 "audio"(仅音频,视频会自动提取音频)
  --prompt : 大模型提示词,决定理解侧重点和输出格式(必填,建议明确描述分析目标)
  --extend-url : 第二段音视频 URL,用于对比分析(最多 1 条扩展 URL,即总共 2 个文件参与对比)

用法:
  # 基础:视频内容理解
  python scripts/mps_av_understand.py \\
      --url https://example.com/video.mp4 \\
      --mode video \\
      --prompt "请分析这个视频的主要内容、场景和关键信息"

  # 音频模式(上传视频时自动提取音频)
  python scripts/mps_av_understand.py \\
      --url https://example.com/video.mp4 \\
      --mode audio \\
      --prompt "请对这段音频进行语音识别,输出完整文字内容"

  # 对比分析(两段音视频)
  python scripts/mps_av_understand.py \\
      --url https://example.com/video1.mp4 \\
      --extend-url https://example.com/video2.mp4 \\
      --mode audio \\
      --prompt "请对比这两段音频,分析演奏水平的差异"

  # COS输入(推荐,使用 --cos-input-key)
  python scripts/mps_av_understand.py \\
      --cos-input-key /input/video.mp4 \\
      --mode video \\
      --prompt "总结视频内容"

  # 异步模式(只提交任务,不等待)
  python scripts/mps_av_understand.py \\
      --url https://example.com/video.mp4 \\
      --mode video --prompt "分析视频内容" --no-wait

  # 查询已有任务结果
  python scripts/mps_av_understand.py --task-id 1234567890-WorkflowTask-xxxxx

  # JSON 格式输出
  python scripts/mps_av_understand.py \\
      --url https://example.com/video.mp4 \\
      --mode video --prompt "分析视频内容" --json

  # dry-run 模式(预览参数,不调用 API)
  python scripts/mps_av_understand.py \\
      --url https://example.com/video.mp4 \\
      --mode video --prompt "分析视频内容" --dry-run
"""

import sys
import os
import json
import time
import argparse

_script_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, _script_dir)

try:
    from mps_load_env import ensure_env_loaded as _ensure_env_loaded
    _LOAD_ENV_AVAILABLE = True
except ImportError:
    _LOAD_ENV_AVAILABLE = False
try:
    import mps_load_env as _le
    _le.load_env_files()
except Exception:
    pass

try:
    from tencentcloud.common import credential
    from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
    from tencentcloud.mps.v20190612 import mps_client, models
except ImportError:
    print("错误: 未安装腾讯云 SDK,请运行: pip install tencentcloud-sdk-python", file=sys.stderr)
    sys.exit(1)

try:
    from mps_poll_task import auto_upload_local_file, poll_video_task, auto_download_outputs
    _POLL_AVAILABLE = True
except ImportError:
    _POLL_AVAILABLE = False

# ─────────────────────────────────────────────
# 配置
# ─────────────────────────────────────────────
DEFAULT_REGION     = os.environ.get("TENCENTCLOUD_API_REGION", "")
DEFAULT_DEFINITION = 33   # 预设视频理解模板(官方推荐)
DEFAULT_MODE       = "video"
POLL_INTERVAL      = 10   # 轮询间隔(秒)
POLL_TIMEOUT       = 600  # 最大等待时间(秒)


# ─────────────────────────────────────────────
# SDK 客户端
# ─────────────────────────────────────────────

def get_client(region: str = DEFAULT_REGION):
    secret_id  = os.environ.get("TENCENTCLOUD_SECRET_ID", "")
    secret_key = os.environ.get("TENCENTCLOUD_SECRET_KEY", "")
    if not secret_id or not secret_key:
        print("错误: 请设置环境变量 TENCENTCLOUD_SECRET_ID 和 TENCENTCLOUD_SECRET_KEY",
              file=sys.stderr)
        sys.exit(1)
    cred = credential.Credential(secret_id, secret_key)
    return mps_client.MpsClient(cred, region)


# ─────────────────────────────────────────────
# 构建 ExtendedParameter
# ─────────────────────────────────────────────

def build_extended_parameter(
    mode: str,
    prompt: str,
    extend_urls: list = None,
) -> str:
    """
    构建 AiAnalysisTask.ExtendedParameter 的 JSON 字符串。

    结构:
      {"mvc": {"mode": "video|audio", "prompt": "...", "extendData": [{"url": "..."}]}}

    Args:
        mode        : "video" 或 "audio"
        prompt      : 大模型提示词
        extend_urls : 额外对比文件 URL 列表(最多 1 条,即总共 2 个文件)
    """
    mvc: dict = {
        "mode":   mode,
        "prompt": prompt,
    }
    if extend_urls:
        mvc["extendData"] = [{"url": u} for u in extend_urls[:1]]  # 最多 1 条扩展

    return json.dumps({"mvc": mvc}, ensure_ascii=False)


# ─────────────────────────────────────────────
# 创建任务
# ─────────────────────────────────────────────

def create_understand_task(
    client,
    url: str = None,
    cos_input_bucket: str = None,
    cos_input_region: str = None,
    cos_input_key: str = None,
    definition: int = DEFAULT_DEFINITION,
    mode: str = DEFAULT_MODE,
    prompt: str = "",
    extend_urls: list = None,
    region: str = DEFAULT_REGION,
) -> str:
    """
    提交音视频理解任务,返回 TaskId。
    """
    req = models.ProcessMediaRequest()

    # ── 输入源 ──
    input_info = models.MediaInputInfo()
    if url:
        input_info.Type = "URL"
        url_input = models.UrlInputInfo()
        url_input.Url = url
        input_info.UrlInputInfo = url_input
    elif cos_input_key:
        # COS 路径输入
        input_info.Type = "COS"
        cos_input = models.CosInputInfo()
        bucket = cos_input_bucket or os.environ.get("TENCENTCLOUD_COS_BUCKET", "")
        cos_region = cos_input_region or os.environ.get("TENCENTCLOUD_COS_REGION", region)
        if not bucket:
            print("错误: 使用 COS 输入时请指定 --cos-input-bucket 或设置 TENCENTCLOUD_COS_BUCKET 环境变量", file=sys.stderr)
            sys.exit(1)
        cos_input.Bucket = bucket
        cos_input.Region = cos_region
        # 确保 key 以 / 开头
        cos_input.Object = cos_input_key if cos_input_key.startswith("/") else f"/{cos_input_key}"
        input_info.CosInputInfo = cos_input
    else:
        print("错误: 请提供 --url 或 --cos-input-key", file=sys.stderr)
        sys.exit(1)

    req.InputInfo = input_info

    # ── AiAnalysisTask ──
    ai_task = models.AiAnalysisTaskInput()
    ai_task.Definition = definition
    if prompt:  # 有 prompt 才构建 ExtendedParameter
        ai_task.ExtendedParameter = build_extended_parameter(
            mode=mode,
            prompt=prompt,
            extend_urls=extend_urls,
        )
    req.AiAnalysisTask = ai_task

    resp = client.ProcessMedia(req)
    return resp.TaskId


# ─────────────────────────────────────────────
# 查询任务
# ─────────────────────────────────────────────

def query_task(client, task_id: str) -> dict:
    req = models.DescribeTaskDetailRequest()
    req.TaskId = task_id
    resp = client.DescribeTaskDetail(req)
    return json.loads(resp.to_json_string())


def poll_task(client, task_id: str, timeout: int = POLL_TIMEOUT) -> dict:
    start = time.time()
    print(f"⏳ 等待任务完成: {task_id}")
    while True:
        result = query_task(client, task_id)
        status = result.get("Status", "")
        if status in ("FINISH", "FAIL"):
            elapsed = int(time.time() - start)
            print(f"{'✅' if status == 'FINISH' else '❌'} 任务{status},耗时 {elapsed}s")
            return result
        if time.time() - start > timeout:
            print(f"⚠️  超时({timeout}s),任务仍在处理", file=sys.stderr)
            return result
        elapsed = int(time.time() - start)
        print(f"   [{elapsed}s] 状态: {status},继续等待...")
        time.sleep(POLL_INTERVAL)


# ─────────────────────────────────────────────
# 结果解析
# ─────────────────────────────────────────────

def extract_result(task_detail: dict) -> dict:
    """从 WorkflowTask.AiAnalysisResultSet 中提取 VideoComprehension 结果"""
    out = {
        "task_id":     task_detail.get("TaskId", ""),
        "status":      task_detail.get("Status", ""),
        "create_time": task_detail.get("CreateTime", ""),
        "finish_time": task_detail.get("FinishTime", ""),
        "comprehension": None,
        "error": None,
    }

    wf = task_detail.get("WorkflowTask", {}) or {}
    for item in wf.get("AiAnalysisResultSet", []):
        if item.get("Type") == "VideoComprehension":
            vc = item.get("VideoComprehensionTask", {}) or {}
            if vc.get("ErrCode", 0) != 0:
                out["error"] = {"code": vc["ErrCode"], "message": vc.get("Message", "")}
            else:
                task_out = vc.get("Output", {}) or {}
                out["comprehension"] = {
                    "result":      task_out.get("VideoComprehensionAnalysisResult", ""),
                    "progress":    vc.get("Progress", 0),
                    "begin_time":  vc.get("BeginProcessTime", ""),
                    "finish_time": vc.get("FinishTime", ""),
                }
            break
    return out


def print_result(result: dict, as_json: bool = False, output_dir: str = None):
    if as_json:
        print(json.dumps(result, ensure_ascii=False, indent=2))
    else:
        sep = "=" * 60
        print(f"\n{sep}")
        print(f"任务 ID : {result['task_id']}")
        print(f"状态    : {result['status']}")
        print(f"完成时间: {result.get('finish_time', '-')}")
        if result.get("error"):
            err = result["error"]
            print(f"\n❌ 错误: [{err['code']}] {err['message']}")
        elif result.get("comprehension"):
            comp = result["comprehension"]
            print(f"\n✅ 理解结果(进度: {comp['progress']}%)")
            print("─" * 60)
            print(comp["result"])
        else:
            print("\n⚠️  暂无结果(任务可能仍在处理中)")
        print(f"{sep}\n")

    if output_dir:
        os.makedirs(output_dir, exist_ok=True)
        fname = f"av_understand_{result['task_id'].replace('/', '_')}.json"
        fpath = os.path.join(output_dir, fname)
        with open(fpath, "w", encoding="utf-8") as f:
            json.dump(result, f, ensure_ascii=False, indent=2)
        print(f"结果已保存: {fpath}")


# ─────────────────────────────────────────────
# CLI 入口
# ─────────────────────────────────────────────

# NOCA:CCN(complex function with multiple execution paths, splitting would reduce readability)
def main():
    parser = argparse.ArgumentParser(
        description="腾讯云 MPS 大模型音视频理解",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=__doc__,
    )

    # 输入源(三选一)
    input_grp = parser.add_mutually_exclusive_group()
    input_grp.add_argument("--local-file", help="本地文件路径,自动上传到 COS 后处理(需配置 TENCENTCLOUD_COS_BUCKET)")
    input_grp.add_argument("--url",        help="音视频 URL(HTTP/HTTPS)")
    input_grp.add_argument("--task-id",    help="直接查询已有任务结果(跳过创建)")
    
    # COS 路径输入(新版,与 mps_transcode.py 等保持一致)
    parser.add_argument("--cos-input-bucket", help="输入文件所在 COS Bucket 名称")
    parser.add_argument("--cos-input-region", help="输入文件所在 COS Region(如 ap-guangzhou)")
    parser.add_argument("--cos-input-key",    help="输入文件的 COS Key(如 /input/video.mp4)")

    # 核心理解参数
    parser.add_argument(
        "--mode", choices=["video", "audio"], default=DEFAULT_MODE,
        help=(
            "理解模式(默认 video):\n"
            "  video — 理解视频画面内容\n"
            "  audio — 仅处理音频(上传视频时自动提取音频)"
        ),
    )
    parser.add_argument(
        "--prompt", default="",
        help="大模型提示词(强烈建议填写,控制理解侧重点和输出格式)",
    )
    parser.add_argument(
        "--extend-url", metavar="URL", action="append", dest="extend_urls",
        help="第二段对比音视频 URL(可与主输入做对比分析,最多 1 条)",
    )

    # 任务参数
    parser.add_argument(
        "--definition", type=int, default=DEFAULT_DEFINITION,
        help=f"AiAnalysisTask 模板 ID(默认 {DEFAULT_DEFINITION},即预设视频理解模板)",
    )
    parser.add_argument("--region", default=DEFAULT_REGION,
                        help=f"地域(默认 {DEFAULT_REGION})")

    # 输出控制
    parser.add_argument("--no-wait",    action="store_true", help="异步模式:只提交任务,不等待结果")
    parser.add_argument("--json",       action="store_true", dest="json_output", help="JSON 格式输出")
    parser.add_argument("--output-dir", help="将结果 JSON 保存到指定目录")
    parser.add_argument("--dry-run",    action="store_true", help="只打印参数预览,不调用 API")

    parser.add_argument("-v", "--verbose", action="store_true", help="显示详细日志信息")

    args = parser.parse_args()

    # 自动加载环境变量(入口处立即执行)
    if _LOAD_ENV_AVAILABLE:
        try:
            _ensure_env_loaded(verbose=getattr(args, "verbose", False))
        except SystemExit:
            raise
        except Exception as e:
            if getattr(args, "verbose", False):
                print(f"⚠️  自动加载 env 失败:{e}", file=sys.stderr)
    # --url 本地路径自动转换为本地上传模式
    if getattr(args, 'url', None) and not getattr(args, 'local_file', None):
        _val = args.url
        if not _val.startswith('http://') and not _val.startswith('https://'):
            print(f"提示:'{_val}' 未指定来源,默认按本地文件处理", file=sys.stderr)
            args.local_file = _val
            args.url = None

    # --local-file 与 COS 输入参数互斥
    if getattr(args, 'local_file', None):
        cos_conflicts = [x for x in [
            getattr(args, 'cos_input_bucket', None), getattr(args, 'cos_input_key', None)
        ] if x]
        if cos_conflicts:
            parser.error("--local-file 不能与 --cos-input-bucket / --cos-input-key 同时使用")

    # 本地文件自动上传
    if getattr(args, 'local_file', None):
        if not _POLL_AVAILABLE:
            print("错误:--local-file 需要 mps_poll_task 模块支持", file=sys.stderr)
            sys.exit(1)
        upload_result = auto_upload_local_file(args.local_file)
        if not upload_result:
            sys.exit(1)
        args.cos_input_key = upload_result["Key"]
        args.cos_input_bucket = upload_result["Bucket"]
        args.cos_input_region = upload_result["Region"]

    # ── dry-run ──
    if args.dry_run:
        print("🔍 dry-run 参数预览:")
        if args.task_id:
            print(f"  task-id    = {args.task_id}")
        else:
            if args.url:
                print(f"  输入       = URL={args.url}")
            elif args.cos_input_key:
                bucket = args.cos_input_bucket or os.environ.get("TENCENTCLOUD_COS_BUCKET", "")
                region = args.cos_input_region or os.environ.get("TENCENTCLOUD_COS_REGION", args.region)
                print(f"  输入       = COS={bucket}/{region}{args.cos_input_key}")
            print(f"  mode       = {args.mode}")
            print(f"  prompt     = {args.prompt!r}")
            print(f"  extend-url = {args.extend_urls}")
            print(f"  definition = {args.definition}")
            print(f"  region     = {args.region}")
            print(f"  no-wait    = {args.no_wait}")
            print(f"  verbose    = {args.verbose}")
        if args.prompt:
            ep = build_extended_parameter(args.mode, args.prompt, args.extend_urls)
            print(f"\n  ExtendedParameter =\n  {ep}")
        return

    client = get_client(args.region)

    # ── 查询模式 ──
    if args.task_id:
        print(f"🔍 查询任务: {args.task_id}")
        if args.verbose:
            print(f"   客户端区域: {args.region}")
        detail = query_task(client, args.task_id)
        result = extract_result(detail)
        print_result(result, as_json=args.json_output, output_dir=args.output_dir)
        return

    # ── 提交任务 ──
    has_input = bool(args.url) or bool(args.cos_input_key)
    if not has_input:
        parser.error("请提供 --url、--cos-input-key 或 --task-id 之一")

    # 确定输入源显示
    if args.url:
        src = f"URL={args.url}"
    else:
        bucket = args.cos_input_bucket or os.environ.get("TENCENTCLOUD_COS_BUCKET", "")
        region = args.cos_input_region or os.environ.get("TENCENTCLOUD_COS_REGION", args.region)
        src = f"COS={bucket}/{region}{args.cos_input_key}"
    
    print(f"🚀 提交音视频理解任务")
    print(f"   输入     : {src}")
    print(f"   mode     : {args.mode}")
    print(f"   prompt   : {args.prompt[:80]!r}{'...' if len(args.prompt) > 80 else ''}")
    if args.extend_urls:
        print(f"   对比文件 : {args.extend_urls}")
    print(f"   definition: {args.definition}  region: {args.region}")
    
    if args.verbose:
        print(f"\n📋 详细参数:")
        print(f"   bucket   : {args.cos_input_bucket or os.environ.get('TENCENTCLOUD_COS_BUCKET', 'N/A')}")
        print(f"   cos_region: {args.cos_input_region or os.environ.get('TENCENTCLOUD_COS_REGION', args.region)}")
        if args.cos_input_key:
            print(f"   cos_key  : {args.cos_input_key}")
        ep = build_extended_parameter(args.mode, args.prompt, args.extend_urls)
        print(f"   extended_param: {ep}")

    try:
        task_id = create_understand_task(
            client,
            url=args.url,
            cos_input_bucket=args.cos_input_bucket,
            cos_input_region=args.cos_input_region,
            cos_input_key=args.cos_input_key,
            definition=args.definition,
            mode=args.mode,
            prompt=args.prompt,
            extend_urls=args.extend_urls,
            region=args.region,
        )
        print("✅ 音视频理解任务提交成功!")
        print(f"   TaskId: {task_id}")
        print(f"\n## TaskId: {task_id}")
        if args.verbose:
            print(f"   轮询间隔: {POLL_INTERVAL}秒  最大等待: {POLL_TIMEOUT}秒")
    except TencentCloudSDKException as e:
        print(f"❌ 提交失败: {e}", file=sys.stderr)
        sys.exit(1)

    # ── 异步模式 ──
    if args.no_wait:
        result = {
            "task_id": task_id,
            "status":  "PROCESSING",
            "message": "任务已提交,使用 --task-id 查询结果",
        }
        print_result(result, as_json=args.json_output, output_dir=args.output_dir)
        return

    # ── 同步等待 ──
    try:
        if args.verbose:
            print(f"\n⏳ 开始轮询任务状态...")
        detail = poll_task(client, task_id)
        if args.verbose:
            print(f"✅ 任务完成")
        result = extract_result(detail)
        print_result(result, as_json=args.json_output, output_dir=args.output_dir)
        if result.get("status") == "FAIL":
            sys.exit(1)
    except TencentCloudSDKException as e:
        print(f"❌ 查询失败: {e}", file=sys.stderr)
        sys.exit(1)


if __name__ == "__main__":
    main()