文件内容
scripts/mps_get_video_task.py
#!/usr/bin/env python3
"""
腾讯云 MPS 媒体处理任务查询脚本
功能:
通过任务 ID 查询 ProcessMedia 提交的媒体处理任务的执行状态和结果详情。
最多可以查询 7 天之内提交的任务。
支持查询任务的整体状态(WAITING / PROCESSING / FINISH),以及各子任务
(转码、截图、字幕、画质增强等)的执行结果和输出文件信息。
用法:
# 查询指定任务
python mps_get_video_task.py --task-id 1234567890-WorkflowTask-80108cc3380155d98b2e3573a48a
# 查询并输出完整 JSON 响应
python mps_get_video_task.py --task-id 1234567890-WorkflowTask-80108cc3380155d98b2e3573a48a --verbose
# 仅输出原始 JSON(方便管道处理)
python mps_get_video_task.py --task-id 1234567890-WorkflowTask-80108cc3380155d98b2e3573a48a --json
环境变量:
TENCENTCLOUD_SECRET_ID - 腾讯云 SecretId
TENCENTCLOUD_SECRET_KEY - 腾讯云 SecretKey
"""
import argparse
import json
import os
import sys
try:
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.mps.v20190612 import mps_client, models
except ImportError:
print("错误:请先安装腾讯云 SDK:pip install tencentcloud-sdk-python", file=sys.stderr)
sys.exit(1)
try:
from qcloud_cos import CosConfig, CosS3Client
_COS_SDK_AVAILABLE = True
except ImportError:
_COS_SDK_AVAILABLE = False
try:
from mps_load_env import ensure_env_loaded as _ensure_env_loaded
_LOAD_ENV_AVAILABLE = True
except ImportError:
_LOAD_ENV_AVAILABLE = False
def _ensure_env_loaded(**kwargs):
return False
# 任务状态中文映射
STATUS_MAP = {
"WAITING": "等待中",
"PROCESSING": "处理中",
"FINISH": "已完成",
"SUCCESS": "成功",
"FAIL": "失败",
}
# 子任务类型中文映射
TASK_TYPE_MAP = {
"Transcode": "转码",
"AnimatedGraphics": "转动图",
"SnapshotByTimeOffset": "时间点截图",
"SampleSnapshot": "采样截图",
"ImageSprites": "雪碧图",
"AdaptiveDynamicStreaming": "自适应码流",
"AudioExtract": "音频分离",
"CoverBySnapshot": "截图做封面",
"AiAnalysis": "AI 内容分析",
"AiRecognition": "AI 内容识别",
"AiContentReview": "AI 内容审核",
"AiQualityControl": "媒体质检",
"SmartSubtitles": "智能字幕",
"SmartErase": "智能擦除",
"Classification": "分类",
"Cover": "封面",
"Cutout": "抠图",
"DeLogo": "去台标",
"Description": "描述",
"Dubbing": "配音",
"FrameTag": "帧标签",
"HeadTail": "片头片尾",
"Highlight": "精彩集锦",
"HorizontalToVertical": "横转竖",
"Reel": "智能二创",
"Segment": "分镜拆条",
"Tag": "标签",
"VideoComprehension": "视频理解",
"VideoRemake": "视频二创/去重",
"Face": "人脸识别",
"Asr": "语音识别",
"AsrFullText": "语音全文识别",
"AsrWords": "语音分词识别",
"Ocr": "文字识别",
"OcrFullText": "文字全文识别",
"OcrWords": "文字分词识别",
"Object": "物体识别",
"TransText": "语音翻译",
"Porn": "涉黄审核",
"Terrorism": "涉恐审核",
"Political": "涉政审核",
"Prohibited": "违禁审核",
"PoliticalAsr": "涉政语音审核",
"PoliticalOcr": "涉政文字审核",
"PornAsr": "涉黄语音审核",
"PornOcr": "涉黄文字审核",
"ProhibitedAsr": "违禁语音审核",
"ProhibitedOcr": "违禁文字审核",
"TerrorismOcr": "涉恐文字审核",
}
def get_credentials():
"""从环境变量获取腾讯云凭证。若缺失则尝试从系统文件自动加载后重试。"""
secret_id = os.environ.get("TENCENTCLOUD_SECRET_ID", "")
secret_key = os.environ.get("TENCENTCLOUD_SECRET_KEY", "")
if not secret_id or not secret_key:
# 尝试从系统环境变量文件自动加载
if _LOAD_ENV_AVAILABLE:
print("[load_env] 环境变量未设置,尝试从系统文件自动加载...", file=sys.stderr)
_ensure_env_loaded(verbose=True)
secret_id = os.environ.get("TENCENTCLOUD_SECRET_ID", "")
secret_key = os.environ.get("TENCENTCLOUD_SECRET_KEY", "")
if not secret_id or not secret_key:
if _LOAD_ENV_AVAILABLE:
from mps_load_env import _print_setup_hint, _TARGET_VARS
_print_setup_hint(["TENCENTCLOUD_SECRET_ID", "TENCENTCLOUD_SECRET_KEY"])
else:
print(
"\n错误:TENCENTCLOUD_SECRET_ID / TENCENTCLOUD_SECRET_KEY 未设置。\n"
"请在 /etc/environment、~/.profile 等文件中添加这些变量。\n",
file=sys.stderr,
)
sys.exit(1)
return credential.Credential(secret_id, secret_key)
def create_mps_client(cred, region):
"""创建 MPS 客户端。"""
http_profile = HttpProfile()
http_profile.endpoint = "mps.tencentcloudapi.com"
http_profile.reqMethod = "POST"
client_profile = ClientProfile()
client_profile.httpProfile = http_profile
return mps_client.MpsClient(cred, region, client_profile)
def format_status(status):
"""格式化状态显示。"""
return STATUS_MAP.get(status, status)
def _try_print_cos_presigned_url(bucket, region, out_path, indent=" "):
"""尝试为 COS 输出文件生成预签名下载链接并打印,失败时静默跳过。"""
if not bucket or not out_path or not _COS_SDK_AVAILABLE:
return
try:
cred = get_credentials()
cos_config = CosConfig(Region=region, SecretId=cred.secret_id, SecretKey=cred.secret_key)
cos_client = CosS3Client(cos_config)
signed_url = cos_client.get_presigned_url(
Bucket=bucket,
Key=out_path.lstrip("/"),
Method="GET",
Expired=3600
)
print(f"{indent}🔗 下载链接(预签名,1小时有效): {signed_url}")
except Exception as e:
print(f"{indent}⚠️ 生成预签名 URL 失败: {e}")
def print_input_info(input_info):
"""打印输入文件信息。"""
if not input_info:
return
input_type = input_info.get("Type", "")
if input_type == "COS":
cos = input_info.get("CosInputInfo", {}) or {}
print(f" 输入: COS - {cos.get('Bucket', '')}:{cos.get('Object', '')} (region: {cos.get('Region', '')})")
elif input_type == "URL":
url_info = input_info.get("UrlInputInfo", {}) or {}
print(f" 输入: URL - {url_info.get('Url', '')}")
else:
print(f" 输入类型: {input_type}")
def print_meta_data(meta):
"""打印媒体元信息。"""
if not meta:
return
duration = meta.get("Duration", 0)
width = meta.get("Width", 0)
height = meta.get("Height", 0)
bitrate = meta.get("Bitrate", 0)
container = meta.get("Container", "")
size = meta.get("Size", 0)
print(f" 原始信息: {container.upper() if container else 'N/A'} | "
f"{width}x{height} | "
f"{bitrate // 1000 if bitrate else 0} kbps | "
f"{duration:.1f}s | "
f"{size / 1024 / 1024:.2f} MB")
def print_media_process_results(result_set):
"""打印媒体处理子任务结果。"""
if not result_set:
print(" 子任务: 无")
return
for i, item in enumerate(result_set, 1):
task_type = item.get("Type", "")
type_name = TASK_TYPE_MAP.get(task_type, task_type)
# 根据类型取对应的任务详情字段
task_key_map = {
"Transcode": "TranscodeTask",
"AnimatedGraphics": "AnimatedGraphicsTask",
"SnapshotByTimeOffset": "SnapshotByTimeOffsetTask",
"SampleSnapshot": "SampleSnapshotTask",
"ImageSprites": "ImageSpritesTask",
"AdaptiveDynamicStreaming": "AdaptiveDynamicStreamingTask",
"AudioExtract": "AudioExtractTask",
"CoverBySnapshot": "CoverBySnapshotTask",
}
task_key = task_key_map.get(task_type, "")
task_detail = item.get(task_key, {}) if task_key else None
if task_detail:
status = task_detail.get("Status", "")
err_code = task_detail.get("ErrCode") or 0
message = task_detail.get("Message", "")
progress = task_detail.get("Progress", None)
status_str = format_status(status)
progress_str = f" ({progress}%)" if progress is not None else ""
err_str = f" | 错误码: {err_code} - {message}" if err_code != 0 else ""
print(f" [{i}] {type_name}: {status_str}{progress_str}{err_str}")
# 打印输出文件信息
output = task_detail.get("Output", {})
if output:
out_storage = output.get("OutputStorage", {}) or {}
out_path = output.get("Path", "")
out_type = out_storage.get("Type", "")
if out_type == "COS":
cos_out = out_storage.get("CosOutputStorage", {}) or {}
bucket = cos_out.get("Bucket", "")
region = cos_out.get("Region", "")
print(f" 输出: COS - {bucket}:{out_path} (region: {region})")
_try_print_cos_presigned_url(bucket, region, out_path)
elif out_path:
print(f" 输出: {out_path}")
# 打印输出视频信息
out_width = output.get("Width", 0)
out_height = output.get("Height", 0)
out_bitrate = output.get("Bitrate", 0)
out_duration = output.get("Duration", 0)
out_size = output.get("Size", 0)
out_container = output.get("Container", "")
if out_width or out_height:
print(f" 规格: {out_container.upper() if out_container else 'N/A'} | "
f"{out_width}x{out_height} | "
f"{out_bitrate // 1000 if out_bitrate else 0} kbps | "
f"{out_duration:.1f}s | "
f"{out_size / 1024 / 1024:.2f} MB")
else:
print(f" [{i}] {type_name}: 无详情")
# NOCA:CCN(complex function with multiple execution paths, splitting would reduce readability)
def print_ai_analysis_results(result_set):
"""打印 AI 内容分析任务结果,包含错误检测。"""
if not result_set:
return
for i, item in enumerate(result_set, 1):
task_type = item.get("Type", "")
# 根据类型获取对应的任务详情
task_key_map = {
"Classification": "ClassificationTask",
"Cover": "CoverTask",
"Tag": "TagTask",
"FrameTag": "FrameTagTask",
"Highlight": "HighlightTask",
"DeLogo": "DeLogoTask",
"Description": "DescriptionTask",
"Dubbing": "DubbingTask",
"VideoRemake": "VideoRemakeTask",
"VRemake": "VideoRemakeTask", # API 实际返回的缩写形式
"VideoComprehension": "VideoComprehensionTask",
"Cutout": "CutoutTask",
"Reel": "ReelTask",
"HeadTail": "HeadTailTask",
"HorizontalToVertical": "HorizontalToVerticalTask",
"Segment": "SegmentTask",
}
task_key = task_key_map.get(task_type, "")
# 对 VRemake 类型,统一当作 VideoRemake 处理
normalized_type = "VideoRemake" if task_type == "VRemake" else task_type
task_detail = item.get(task_key, {}) if task_key else None
if task_detail:
status = task_detail.get("Status", "")
err_code = task_detail.get("ErrCode") or 0
err_code_ext = task_detail.get("ErrCodeExt", "")
message = task_detail.get("Message", "")
status_str = format_status(status)
err_str = ""
if err_code != 0:
err_str = f" | 错误码: {err_code}"
if err_code_ext:
err_str += f" ({err_code_ext})"
if message and message != "SUCCESS":
err_str += f" - {message}"
print(f" [{i}] AI分析-{task_type}: {status_str}{err_str}")
# 打印分析输出信息
output = task_detail.get("Output", {})
if output:
# 分类结果
classifications = output.get("Classifications", [])
if classifications:
print(f" 分类结果:")
for cls in classifications[:5]: # 最多显示5个
name = cls.get("Name", "")
conf = cls.get("Confidence", 0)
print(f" - {name} ({conf * 100:.1f}%)")
# 标签
tags = output.get("Tags", [])
if tags:
print(f" 标签: {', '.join(tags[:10])}") # 最多显示10个标签
# 封面
cover_path = output.get("CoverPath", "")
if cover_path:
print(f" 封面: {cover_path}")
# 描述
description = output.get("Description", "")
if description:
print(f" 描述: {description[:100]}{'...' if len(description) > 100 else ''}")
# 精彩片段
highlights = output.get("Highlights", [])
if highlights:
print(f" 精彩片段:")
for hl in highlights[:3]: # 最多显示3个
start = hl.get("StartTime", 0)
end = hl.get("EndTime", 0)
conf = hl.get("Confidence", 0)
print(f" - {start}s - {end}s (置信度: {conf * 100:.1f}%)")
# VideoRemake 输出视频路径(视频去重 / 视频二创)
if normalized_type == "VideoRemake":
out_path = output.get("Path", "")
if out_path:
confidence = output.get("Confidence", None)
conf_str = f" (置信度: {confidence})" if confidence is not None else ""
print(f" 输出路径: {out_path}{conf_str}")
out_storage = output.get("OutputStorage", {}) or {}
out_type = out_storage.get("Type", "")
if out_type == "COS":
cos_out = out_storage.get("CosOutputStorage", {}) or {}
bucket = cos_out.get("Bucket", "")
region = cos_out.get("Region", "")
if bucket and region:
_try_print_cos_presigned_url(bucket, region, out_path)
else:
print(f" [{i}] AI分析-{task_type}: 无详情")
# NOCA:CCN(complex function with multiple execution paths, splitting would reduce readability)
def print_ai_recognition_results(result_set):
"""打印 AI 内容识别任务结果,包含错误检测。"""
if not result_set:
return
for i, item in enumerate(result_set, 1):
task_type = item.get("Type", "")
# 根据类型获取对应的任务详情
task_key_map = {
"Face": "FaceTask",
"Asr": "AsrTask",
"Ocr": "OcrTask",
"Object": "ObjectTask",
"AsrWords": "AsrWordsTask",
"OcrWords": "OcrWordsTask",
"TransText": "TransTextTask",
}
task_key = task_key_map.get(task_type, "")
task_detail = item.get(task_key, {}) if task_key else None
if task_detail:
status = task_detail.get("Status", "")
err_code = task_detail.get("ErrCode") or 0
err_code_ext = task_detail.get("ErrCodeExt", "")
message = task_detail.get("Message", "")
status_str = format_status(status)
err_str = ""
if err_code != 0:
err_str = f" | 错误码: {err_code}"
if err_code_ext:
err_str += f" ({err_code_ext})"
if message and message != "SUCCESS":
err_str += f" - {message}"
print(f" [{i}] AI识别-{task_type}: {status_str}{err_str}")
# 打印识别输出信息
output = task_detail.get("Output", {})
if output:
# 人脸识别
if task_type == "Face":
face_set = output.get("FaceSet", [])
if face_set:
print(f" 识别到 {len(face_set)} 个人脸:")
for face in face_set[:5]: # 最多显示5个
name = face.get("Name", "未知")
conf = face.get("Confidence", 0)
print(f" - {name} ({conf * 100:.1f}%)")
# 语音识别
elif task_type == "Asr":
subtitle_path = output.get("SubtitlePath", "")
if subtitle_path:
print(f" 字幕文件: {subtitle_path}")
# 文字识别
elif task_type == "Ocr":
text_set = output.get("TextSet", [])
if text_set:
print(f" 识别到 {len(text_set)} 个文本框")
# 物体识别
elif task_type == "Object":
object_set = output.get("ObjectSet", [])
if object_set:
print(f" 识别到 {len(object_set)} 个物体:")
for obj in object_set[:5]: # 最多显示5个
name = obj.get("Name", "")
conf = obj.get("Confidence", 0)
print(f" - {name} ({conf * 100:.1f}%)")
# 语音翻译
elif task_type == "TransText":
subtitle_path = output.get("SubtitlePath", "")
if subtitle_path:
print(f" 翻译字幕文件: {subtitle_path}")
else:
print(f" [{i}] AI识别-{task_type}: 无详情")
def print_ai_content_review_results(result_set):
"""打印 AI 内容审核任务结果,包含错误检测。"""
if not result_set:
return
for i, item in enumerate(result_set, 1):
task_type = item.get("Type", "")
# 根据类型获取对应的任务详情
task_key_map = {
"Porn": "PornTask",
"Terrorism": "TerrorismTask",
"Political": "PoliticalTask",
"Prohibited": "ProhibitedTask",
"PoliticalAsr": "PoliticalAsrTask",
"PoliticalOcr": "PoliticalOcrTask",
"PornAsr": "PornAsrTask",
"PornOcr": "PornOcrTask",
"ProhibitedAsr": "ProhibitedAsrTask",
"ProhibitedOcr": "ProhibitedOcrTask",
"TerrorismOcr": "TerrorismOcrTask",
}
task_key = task_key_map.get(task_type, "")
task_detail = item.get(task_key, {}) if task_key else None
if task_detail:
status = task_detail.get("Status", "")
err_code = task_detail.get("ErrCode") or 0
err_code_ext = task_detail.get("ErrCodeExt", "")
message = task_detail.get("Message", "")
status_str = format_status(status)
err_str = ""
if err_code != 0:
err_str = f" | 错误码: {err_code}"
if err_code_ext:
err_str += f" ({err_code_ext})"
if message and message != "SUCCESS":
err_str += f" - {message}"
print(f" [{i}] AI审核-{task_type}: {status_str}{err_str}")
# 打印审核输出信息
output = task_detail.get("Output", {})
if output:
suggestion = output.get("Suggestion", "")
label = output.get("Label", "")
confidence = output.get("Confidence", 0)
if suggestion:
suggestion_map = {
"pass": "通过",
"review": "复核",
"block": "拦截"
}
sug_text = suggestion_map.get(suggestion, suggestion)
print(f" 建议: {sug_text}")
if label:
print(f" 标签: {label}")
if confidence:
print(f" 置信度: {confidence * 100:.1f}%")
else:
print(f" [{i}] AI审核-{task_type}: 无详情")
# NOCA:CCN(complex function with multiple execution paths, splitting would reduce readability)
def print_ai_quality_control_result(result):
"""打印 AI 媒体质检任务结果,包含错误检测。"""
if not result:
return
status = result.get("Status", "")
err_code = result.get("ErrCode") or 0
err_code_ext = result.get("ErrCodeExt", "")
message = result.get("Message", "")
status_str = format_status(status)
err_str = ""
if err_code != 0:
err_str = f" | 错误码: {err_code}"
if err_code_ext:
err_str += f" ({err_code_ext})"
if message and message != "SUCCESS":
err_str += f" - {message}"
print(f" AI质检: {status_str}{err_str}")
# 获取输出信息
output = result.get("Output", {})
# 检查是否缺失音视频
no_audio = output.get("NoAudio", False)
no_video = output.get("NoVideo", False)
if no_audio or no_video:
issues = []
if no_audio:
issues.append("缺失音频")
if no_video:
issues.append("缺失视频")
print(f" ⚠️ {', '.join(issues)}")
# 质量评分
score = output.get("QualityEvaluationScore")
if score is not None:
print(f" 质量评分: {score}")
# 检查容器诊断结果
diagnose_results = output.get("ContainerDiagnoseResultSet", [])
if diagnose_results:
has_fatal = False
has_warning = False # NOCA:unused-variable(variable reserved for future use)
print(f" 质检诊断结果:")
for diagnose in diagnose_results:
category = diagnose.get("Category", "")
type_name = diagnose.get("Type", "")
severity = diagnose.get("SeverityLevel", "")
# 根据严重级别输出
if severity == "Fatal":
has_fatal = True
print(f" ❌ 【致命】{category}/{type_name}")
elif severity == "Warning":
has_warning = True
print(f" ⚠️ 【警告】{category}/{type_name}")
else:
print(f" ℹ️ 【{severity}】{category}/{type_name}")
# 输出时间戳(如果有)
timestamps = diagnose.get("TimestampSet", [])
if timestamps:
print(f" 时间点: {', '.join(map(str, timestamps))} 秒")
# 如果有致命错误,在开头标记任务失败
if has_fatal and status_str == "已完成":
print(f" ⚠️ 质检检测到致命错误,任务状态应为失败")
# 检查质检统计信息
qc_stats = output.get("QualityControlStatSet", [])
if qc_stats:
print(f" 质检统计:")
for stat in qc_stats:
stat_type = stat.get("Type", "")
avg_val = stat.get("AvgValue", 0)
max_val = stat.get("MaxValue", 0)
min_val = stat.get("MinValue", 0)
# 只输出有意义的统计数据(非零值)
if max_val > 0 or avg_val > 0:
print(f" {stat_type}: 平均={avg_val}, 最大={max_val}, 最小={min_val}")
# NOCA:CCN(complex function with multiple execution paths, splitting would reduce readability)
def print_smart_subtitles_results(result_set):
"""打印智能字幕任务结果,包含错误检测。"""
if not result_set:
return
for i, item in enumerate(result_set, 1):
task_type = item.get("Type", "")
task_key_map = {
"AsrFullText": "AsrFullTextTask",
"TransText": "TransTextTask",
"PureSubtitleTrans": "PureSubtitleTransTask",
"OcrFullText": "OcrFullTextTask",
}
task_key = task_key_map.get(task_type, "")
task_detail = item.get(task_key, {}) if task_key else None
if task_detail:
status = task_detail.get("Status", "")
err_code = task_detail.get("ErrCode") or 0
err_code_ext = task_detail.get("ErrCodeExt", "")
message = task_detail.get("Message", "")
status_str = format_status(status)
err_str = ""
if err_code != 0:
err_str = f" | 错误码: {err_code}"
if err_code_ext:
err_str += f" ({err_code_ext})"
if message and message != "SUCCESS":
err_str += f" - {message}"
print(f" [{i}] 智能字幕-{task_type}: {status_str}{err_str}")
# 检查字幕结果中的错误
output = task_detail.get("Output", {})
if output:
# TransTextTask 的字幕结果
subtitle_results = output.get("SubtitleResults", [])
for j, sub in enumerate(subtitle_results, 1):
sub_status = sub.get("Status", "")
sub_err = sub.get("ErrCode") or 0
sub_err_ext = sub.get("ErrCodeExt", "")
sub_msg = sub.get("Message", "")
if sub_err != 0 or sub_status == "FAIL":
err_info = f"错误码: {sub_err}"
if sub_err_ext:
err_info += f" ({sub_err_ext})"
if sub_msg:
err_info += f" - {sub_msg}"
print(f" 字幕结果[{j}]: {format_status(sub_status)} | {err_info}")
# OcrFullTextTask 的识别结果
recognize_results = output.get("RecognizeSubtitleResult", [])
for j, rec in enumerate(recognize_results, 1):
rec_status = rec.get("Status", "")
rec_err = rec.get("ErrCode") or 0
rec_err_ext = rec.get("ErrCodeExt", "")
rec_msg = rec.get("Message", "")
if rec_err != 0 or rec_status == "FAIL":
err_info = f"错误码: {rec_err}"
if rec_err_ext:
err_info += f" ({rec_err_ext})"
if rec_msg:
err_info += f" - {rec_msg}"
print(f" OCR识别结果[{j}]: {format_status(rec_status)} | {err_info}")
# OcrFullTextTask 的翻译结果
trans_results = output.get("TransSubtitleResult", [])
for j, trans in enumerate(trans_results, 1):
trans_status = trans.get("Status", "")
trans_err = trans.get("ErrCode") or 0
trans_err_ext = trans.get("ErrCodeExt", "")
trans_msg = trans.get("Message", "")
if trans_err != 0 or trans_status == "FAIL":
err_info = f"错误码: {trans_err}"
if trans_err_ext:
err_info += f" ({trans_err_ext})"
if trans_msg:
err_info += f" - {trans_msg}"
print(f" 字幕翻译结果[{j}]: {format_status(trans_status)} | {err_info}")
else:
print(f" [{i}] 智能字幕-{task_type}: 无详情")
def print_smart_erase_result(result):
"""打印智能擦除任务结果,包含错误检测。"""
if not result:
return
status = result.get("Status", "")
err_code = result.get("ErrCode") or 0
message = result.get("Message", "")
status_str = format_status(status)
err_str = ""
if err_code != 0:
err_str = f" | 错误码: {err_code}"
if message and message != "SUCCESS":
err_str += f" - {message}"
print(f" 智能擦除: {status_str}{err_str}")
def print_edit_media_task(task):
"""打印视频编辑任务结果,包含错误检测。"""
if not task:
return
status = task.get("Status", "")
err_code = task.get("ErrCode") or 0
message = task.get("Message", "")
status_str = format_status(status)
err_str = ""
if err_code != 0:
err_str = f" | 错误码: {err_code}"
if message and message != "SUCCESS":
err_str += f" - {message}"
print(f" 视频编辑: {status_str}{err_str}")
# 输入信息
print_input_info(task.get("InputInfo"))
# 输出信息
output = task.get("Output", {})
if output:
out_storage = output.get("OutputStorage", {}) or {}
out_path = output.get("Path", "")
out_type = out_storage.get("Type", "")
if out_type == "COS":
cos_out = out_storage.get("CosOutputStorage", {}) or {}
bucket = cos_out.get("Bucket", "")
region = cos_out.get("Region", "")
print(f" 输出: COS - {bucket}:{out_path} (region: {region})")
_try_print_cos_presigned_url(bucket, region, out_path)
def print_live_stream_task(task):
"""打印直播流处理任务结果,包含错误检测。"""
if not task:
return
status = task.get("Status", "")
err_code = task.get("ErrCode") or 0
message = task.get("Message", "")
status_str = format_status(status)
err_str = ""
if err_code != 0:
err_str = f" | 错误码: {err_code}"
if message and message != "SUCCESS":
err_str += f" - {message}"
print(f" 直播流处理: {status_str}{err_str}")
def print_extract_blind_watermark_task(task):
"""打印提取盲水印任务结果,包含错误检测。"""
if not task:
return
status = task.get("Status", "")
err_code = task.get("ErrCode") or 0
message = task.get("Message", "")
status_str = format_status(status)
err_str = ""
if err_code != 0:
err_str = f" | 错误码: {err_code}"
if message and message != "SUCCESS":
err_str += f" - {message}"
print(f" 提取盲水印: {status_str}{err_str}")
# 输出文本
output_text = task.get("OutputText", "")
if output_text:
print(f" 提取内容: {output_text}")
def print_schedule_activity_results(result_set):
"""
打印编排任务活动结果,包含错误检测。
根据 ActivityType 动态获取对应的任务详情,递归处理每个子任务的
ErrCode、ErrCodeExt 和 Message。
"""
if not result_set:
return
# ActivityType 到任务详情字段的映射
ACTIVITY_TYPE_MAP = { # NOCA:invalid-name(naming follows SDK convention)
"Transcode": "TranscodeTask",
"AnimatedGraphics": "AnimatedGraphicsTask",
"SnapshotByTimeOffset": "SnapshotByTimeOffsetTask",
"SampleSnapshot": "SampleSnapshotTask",
"ImageSprites": "ImageSpritesTask",
"AdaptiveDynamicStreaming": "AdaptiveDynamicStreamingTask",
"Recognition": "RecognitionTask",
"Review": "ReviewTask",
"Analysis": "AnalysisTask",
"QualityControl": "QualityControlTask",
"SmartSubtitles": "SmartSubtitlesTask",
"SmartErase": "SmartEraseTask",
"ExecRule": "ExecRuleTask",
"AudioExtract": "AudioExtractTask",
"CoverBySnapshot": "CoverBySnapshotTask",
"Classification": "ClassificationTask",
"Cover": "CoverTask",
"Cutout": "CutoutTask",
"DeLogo": "DeLogoTask",
"Description": "DescriptionTask",
"Dubbing": "DubbingTask",
"FrameTag": "FrameTagTask",
"HeadTail": "HeadTailTask",
"Highlight": "HighlightTask",
"HorizontalToVertical": "HorizontalToVerticalTask",
"Reel": "ReelTask",
"Segment": "SegmentTask",
"Tag": "TagTask",
"VideoComprehension": "VideoComprehensionTask",
"VideoRemake": "VideoRemakeTask",
"Face": "FaceTask",
"Asr": "AsrTask",
"AsrFullText": "AsrFullTextTask",
"AsrWords": "AsrWordsTask",
"Ocr": "OcrTask",
"OcrFullText": "OcrFullTextTask",
"OcrWords": "OcrWordsTask",
"Object": "ObjectTask",
"TransText": "TransTextTask",
"Porn": "PornTask",
"Terrorism": "TerrorismTask",
"Political": "PoliticalTask",
"Prohibited": "ProhibitedTask",
"PoliticalAsr": "PoliticalAsrTask",
"PoliticalOcr": "PoliticalOcrTask",
"PornAsr": "PornAsrTask",
"PornOcr": "PornOcrTask",
"ProhibitedAsr": "ProhibitedAsrTask",
"ProhibitedOcr": "ProhibitedOcrTask",
"TerrorismOcr": "TerrorismOcrTask",
}
for i, item in enumerate(result_set, 1):
activity_res = item.get("ActivityResItem", {})
activity_type = item.get("ActivityType", "")
# 根据 ActivityType 获取对应的任务详情字段名
task_key = ACTIVITY_TYPE_MAP.get(activity_type)
if task_key and task_key in activity_res:
task = activity_res[task_key]
# 统一处理所有子任务类型的错误信息
_print_schedule_subtask_error(task, activity_type, i)
else:
# 未知类型,尝试遍历所有可能的任务字段
_print_unknown_activity_type(activity_res, activity_type, i)
def _print_schedule_subtask_error(task, activity_type, index):
"""
统一打印编排子任务错误信息。
递归处理 ErrCode、ErrCodeExt 和 Message。
"""
if not task:
return
status = task.get("Status", "")
err_code = task.get("ErrCode") or 0
err_code_ext = task.get("ErrCodeExt", "")
message = task.get("Message", "")
progress = task.get("Progress")
# 类型名称映射
type_name_map = {
"Transcode": "转码",
"AnimatedGraphics": "转动图",
"SnapshotByTimeOffset": "时间点截图",
"SampleSnapshot": "采样截图",
"ImageSprites": "雪碧图",
"AdaptiveDynamicStreaming": "自适应码流",
"AudioExtract": "音频分离",
"CoverBySnapshot": "截图做封面",
"Recognition": "AI识别",
"Review": "AI审核",
"Analysis": "AI分析",
"QualityControl": "质检",
"SmartSubtitles": "智能字幕",
"SmartErase": "智能擦除",
"ExecRule": "规则执行",
"Classification": "分类",
"Cover": "封面",
"Cutout": "抠图",
"DeLogo": "去台标",
"Description": "描述",
"Dubbing": "配音",
"FrameTag": "帧标签",
"HeadTail": "片头片尾",
"Highlight": "精彩集锦",
"HorizontalToVertical": "横转竖",
"Reel": "智能二创",
"Segment": "分镜拆条",
"Tag": "标签",
"VideoComprehension": "视频理解",
"VideoRemake": "视频二创/去重",
"Face": "人脸识别",
"Asr": "语音识别",
"AsrFullText": "语音全文识别",
"AsrWords": "语音分词识别",
"Ocr": "文字识别",
"OcrFullText": "文字全文识别",
"OcrWords": "文字分词识别",
"Object": "物体识别",
"TransText": "翻译",
"Porn": "涉黄审核",
"Terrorism": "涉恐审核",
"Political": "涉政审核",
"Prohibited": "违禁审核",
"PoliticalAsr": "涉政语音审核",
"PoliticalOcr": "涉政文字审核",
"PornAsr": "涉黄语音审核",
"PornOcr": "涉黄文字审核",
"ProhibitedAsr": "违禁语音审核",
"ProhibitedOcr": "违禁文字审核",
"TerrorismOcr": "涉恐文字审核",
}
type_name = type_name_map.get(activity_type, activity_type)
status_str = format_status(status)
progress_str = f" ({progress}%)" if progress is not None else ""
# 构建错误信息字符串
err_str = ""
if err_code != 0:
err_str = f" | 错误码: {err_code}"
if err_code_ext:
err_str += f" ({err_code_ext})"
if message and message != "SUCCESS":
err_str += f" - {message}"
print(f" 编排-{type_name}[{index}]: {status_str}{progress_str}{err_str}")
# 递归处理嵌套的输出结果中的错误(如字幕任务的子结果)
_print_nested_task_errors(task, f"编排-{type_name}[{index}]")
# NOCA:CCN(complex function with multiple execution paths, splitting would reduce readability)
def _print_nested_task_errors(task, parent_label):
"""递归打印嵌套任务的错误信息。"""
if not task:
return
output = task.get("Output", {})
if not output:
return
# 处理字幕结果的嵌套错误
subtitle_results = output.get("SubtitleResults", [])
for j, sub in enumerate(subtitle_results, 1):
sub_status = sub.get("Status", "")
sub_err = sub.get("ErrCode") or 0
sub_err_ext = sub.get("ErrCodeExt", "")
sub_msg = sub.get("Message", "")
if sub_err != 0 or sub_status == "FAIL":
err_info = f"错误码: {sub_err}"
if sub_err_ext:
err_info += f" ({sub_err_ext})"
if sub_msg:
err_info += f" - {sub_msg}"
print(f" └─ 字幕结果[{j}]: {format_status(sub_status)} | {err_info}")
# 处理 OCR 识别结果的嵌套错误
recognize_results = output.get("RecognizeSubtitleResult", [])
for j, rec in enumerate(recognize_results, 1):
rec_status = rec.get("Status", "")
rec_err = rec.get("ErrCode") or 0
rec_err_ext = rec.get("ErrCodeExt", "")
rec_msg = rec.get("Message", "")
if rec_err != 0 or rec_status == "FAIL":
err_info = f"错误码: {rec_err}"
if rec_err_ext:
err_info += f" ({rec_err_ext})"
if rec_msg:
err_info += f" - {rec_msg}"
print(f" └─ OCR识别结果[{j}]: {format_status(rec_status)} | {err_info}")
# 处理字幕翻译结果的嵌套错误
trans_results = output.get("TransSubtitleResult", [])
for j, trans in enumerate(trans_results, 1):
trans_status = trans.get("Status", "")
trans_err = trans.get("ErrCode") or 0
trans_err_ext = trans.get("ErrCodeExt", "")
trans_msg = trans.get("Message", "")
if trans_err != 0 or trans_status == "FAIL":
err_info = f"错误码: {trans_err}"
if trans_err_ext:
err_info += f" ({trans_err_ext})"
if trans_msg:
err_info += f" - {trans_msg}"
print(f" └─ 字幕翻译结果[{j}]: {format_status(trans_status)} | {err_info}")
# 处理质检任务的诊断结果
diagnose_results = output.get("ContainerDiagnoseResultSet", [])
if diagnose_results:
print(f" └─ 质检诊断结果:")
has_fatal = False
has_warning = False # NOCA:unused-variable(variable reserved for future use)
for diagnose in diagnose_results:
category = diagnose.get("Category", "")
type_name = diagnose.get("Type", "")
severity = diagnose.get("SeverityLevel", "")
if severity == "Fatal":
has_fatal = True
print(f" ❌ 【致命】{category}/{type_name}")
elif severity == "Warning":
has_warning = True
print(f" ⚠️ 【警告】{category}/{type_name}")
else:
print(f" ℹ️ 【{severity}】{category}/{type_name}")
timestamps = diagnose.get("TimestampSet", [])
if timestamps:
print(f" 时间点: {', '.join(map(str, timestamps))} 秒")
if has_fatal:
print(f" ⚠️ 质检检测到致命错误")
def _print_unknown_activity_type(activity_res, activity_type, index):
"""处理未知的 ActivityType,尝试遍历所有可能的任务字段。"""
# 尝试所有已知的任务字段
known_tasks = [
("TranscodeTask", "转码"),
("AnimatedGraphicsTask", "转动图"),
("SnapshotByTimeOffsetTask", "时间点截图"),
("SampleSnapshotTask", "采样截图"),
("ImageSpritesTask", "雪碧图"),
("AdaptiveDynamicStreamingTask", "自适应码流"),
("AudioExtractTask", "音频分离"),
("CoverBySnapshotTask", "截图做封面"),
("RecognitionTask", "AI识别"),
("ReviewTask", "AI审核"),
("AnalysisTask", "AI分析"),
("QualityControlTask", "质检"),
("SmartSubtitlesTask", "智能字幕"),
("SmartEraseTask", "智能擦除"),
("ExecRuleTask", "规则执行"),
("ClassificationTask", "分类"),
("CoverTask", "封面"),
("CutoutTask", "抠图"),
("DeLogoTask", "去台标"),
("DescriptionTask", "描述"),
("DubbingTask", "配音"),
("FrameTagTask", "帧标签"),
("HeadTailTask", "片头片尾"),
("HighlightTask", "精彩集锦"),
("HorizontalToVerticalTask", "横转竖"),
("ReelTask", "智能二创"),
("SegmentTask", "分镜拆条"),
("TagTask", "标签"),
("VideoComprehensionTask", "视频理解"),
("VideoRemakeTask", "视频二创/去重"),
("FaceTask", "人脸识别"),
("AsrTask", "语音识别"),
("AsrFullTextTask", "语音全文识别"),
("AsrWordsTask", "语音分词识别"),
("OcrTask", "文字识别"),
("OcrFullTextTask", "文字全文识别"),
("OcrWordsTask", "文字分词识别"),
("ObjectTask", "物体识别"),
("TransTextTask", "翻译"),
("PornTask", "涉黄审核"),
("TerrorismTask", "涉恐审核"),
("PoliticalTask", "涉政审核"),
("ProhibitedTask", "违禁审核"),
("PoliticalAsrTask", "涉政语音审核"),
("PoliticalOcrTask", "涉政文字审核"),
("PornAsrTask", "涉黄语音审核"),
("PornOcrTask", "涉黄文字审核"),
("ProhibitedAsrTask", "违禁语音审核"),
("ProhibitedOcrTask", "违禁文字审核"),
("TerrorismOcrTask", "涉恐文字审核"),
]
found = False
for task_key, type_name in known_tasks:
if task_key in activity_res:
task = activity_res[task_key]
_print_schedule_subtask_error(task, type_name, index)
found = True
break
if not found:
# 完全未知的类型,打印警告
print(f" 编排-未知类型[{index}] (ActivityType: {activity_type}): 无法识别的任务类型")
def print_media_task_error(task, label):
"""打印媒体处理任务错误信息。"""
if not task:
return
status = task.get("Status", "")
err_code = task.get("ErrCode") or 0
err_code_ext = task.get("ErrCodeExt", "")
message = task.get("Message", "")
progress = task.get("Progress", None)
status_str = format_status(status)
progress_str = f" ({progress}%)" if progress is not None else ""
err_str = ""
if err_code != 0:
err_str = f" | 错误码: {err_code}"
if err_code_ext:
err_str += f" ({err_code_ext})"
if message and message != "SUCCESS":
err_str += f" - {message}"
print(f" {label}: {status_str}{progress_str}{err_str}")
def print_ai_task_error(task, label):
"""打印AI任务错误信息。"""
if not task:
return
status = task.get("Status", "")
err_code = task.get("ErrCode") or 0
err_code_ext = task.get("ErrCodeExt", "")
message = task.get("Message", "")
status_str = format_status(status)
err_str = ""
if err_code != 0:
err_str = f" | 错误码: {err_code}"
if err_code_ext:
err_str += f" ({err_code_ext})"
if message and message != "SUCCESS":
err_str += f" - {message}"
print(f" {label}: {status_str}{err_str}")
def print_schedule_task(task):
"""打印编排任务结果,包含错误检测。"""
if not task:
return
status = task.get("Status", "")
err_code = task.get("ErrCode") or 0
message = task.get("Message", "")
status_str = format_status(status)
err_str = ""
if err_code != 0:
err_str = f" | 错误码: {err_code}"
if message and message != "SUCCESS":
err_str += f" - {message}"
print(f" 编排任务: {status_str}{err_str}")
# 输入信息
print_input_info(task.get("InputInfo"))
# 活动结果
print("-" * 60)
print(" 编排活动结果:")
print_schedule_activity_results(task.get("ActivityResultSet", []))
def print_live_schedule_task(task):
"""打印直播编排任务结果,包含错误检测。"""
if not task:
return
status = task.get("Status", "")
err_code = task.get("ErrCode") or 0
message = task.get("Message", "")
status_str = format_status(status)
err_str = ""
if err_code != 0:
err_str = f" | 错误码: {err_code}"
if message and message != "SUCCESS":
err_str += f" - {message}"
print(f" 直播编排: {status_str}{err_str}")
# 直播活动结果
live_results = task.get("LiveActivityResultSet", [])
if live_results:
print("-" * 60)
print(" 直播活动结果:")
for i, item in enumerate(live_results, 1):
live_res = item.get("LiveActivityResItem", {})
if "LiveRecordTask" in live_res:
record_task = live_res["LiveRecordTask"]
rec_status = record_task.get("Status", "")
rec_err = record_task.get("ErrCode") or 0
rec_msg = record_task.get("Message", "")
rec_status_str = format_status(rec_status)
rec_err_str = ""
if rec_err != 0:
rec_err_str = f" | 错误码: {rec_err}"
if rec_msg and rec_msg != "SUCCESS":
rec_err_str += f" - {rec_msg}"
print(f" [{i}] 直播录制: {rec_status_str}{rec_err_str}")
if "LiveQualityControlTask" in live_res:
qc_task = live_res["LiveQualityControlTask"]
qc_status = qc_task.get("Status", "")
qc_err = qc_task.get("ErrCode") or 0
qc_msg = qc_task.get("Message", "")
qc_status_str = format_status(qc_status)
qc_err_str = ""
if qc_err != 0:
qc_err_str = f" | 错误码: {qc_err}"
if qc_msg and qc_msg != "SUCCESS":
qc_err_str += f" - {qc_msg}"
print(f" [{i}] 直播质检: {qc_status_str}{qc_err_str}")
def query_task(args):
"""查询媒体处理任务详情。"""
region = args.region or os.environ.get("TENCENTCLOUD_API_REGION", "ap-guangzhou")
# 1. 获取凭证和客户端
cred = get_credentials()
client = create_mps_client(cred, region)
# 2. 构建请求
params = {"TaskId": args.task_id}
# 3. 发起调用
try:
req = models.DescribeTaskDetailRequest()
req.from_json_string(json.dumps(params))
resp = client.DescribeTaskDetail(req)
result = json.loads(resp.to_json_string())
# 仅输出 JSON 模式
if args.json:
print(json.dumps(result, ensure_ascii=False, indent=2))
return result
# 解析响应
task_type = result.get("TaskType", "")
status = result.get("Status", "")
create_time = result.get("CreateTime", "")
begin_time = result.get("BeginProcessTime", "")
finish_time = result.get("FinishTime", "")
print("=" * 60)
print("腾讯云 MPS 媒体处理任务详情")
print("=" * 60)
print(f" TaskId: {args.task_id}")
print(f" 任务类型: {task_type}")
print(f" 状态: {format_status(status)}")
print(f" 创建时间: {create_time}")
if begin_time:
print(f" 开始时间: {begin_time}")
if finish_time:
print(f" 完成时间: {finish_time}")
print("-" * 60)
# WorkflowTask(ProcessMedia 提交的任务)
workflow_task = result.get("WorkflowTask")
if workflow_task:
wf_status = workflow_task.get("Status", "")
wf_err = workflow_task.get("ErrCode") or 0
wf_msg = workflow_task.get("Message", "")
print(f" 工作流状态: {format_status(wf_status)}", end="")
if wf_err != 0:
print(f" | 错误码: {wf_err} - {wf_msg}", end="")
print()
# 输入信息
print_input_info(workflow_task.get("InputInfo"))
# 元信息
print_meta_data(workflow_task.get("MetaData"))
# 子任务结果
print("-" * 60)
print(" 子任务结果:")
print_media_process_results(workflow_task.get("MediaProcessResultSet", []))
# AI 任务结果
print("-" * 60)
print(" AI 任务结果:")
print_ai_analysis_results(workflow_task.get("AiAnalysisResultSet", []))
print_ai_recognition_results(workflow_task.get("AiRecognitionResultSet", []))
print_ai_content_review_results(workflow_task.get("AiContentReviewResultSet", []))
print_ai_quality_control_result(workflow_task.get("AiQualityControlTaskResult", {}))
# 智能字幕任务
smart_subtitles = workflow_task.get("SmartSubtitlesTaskResultSet", [])
if smart_subtitles:
print("-" * 60)
print(" 智能字幕任务:")
print_smart_subtitles_results(smart_subtitles)
# 智能擦除任务
smart_erase = workflow_task.get("SmartEraseTaskResult", {})
if smart_erase:
print("-" * 60)
print(" 智能擦除任务:")
print_smart_erase_result(smart_erase)
# EditMediaTask(视频编辑任务)
elif result.get("EditMediaTask"):
edit_task = result.get("EditMediaTask")
print_edit_media_task(edit_task)
# LiveStreamProcessTask(直播流处理任务)
elif result.get("LiveStreamProcessTask"):
live_task = result.get("LiveStreamProcessTask")
print_live_stream_task(live_task)
# ExtractBlindWatermarkTask(提取盲水印任务)
elif result.get("ExtractBlindWatermarkTask"):
watermark_task = result.get("ExtractBlindWatermarkTask")
print_extract_blind_watermark_task(watermark_task)
# ScheduleTask(编排任务)
elif result.get("ScheduleTask"):
schedule_task = result.get("ScheduleTask")
print_schedule_task(schedule_task)
# LiveScheduleTask(直播编排任务)
elif result.get("LiveScheduleTask"):
live_schedule_task = result.get("LiveScheduleTask")
print_live_schedule_task(live_schedule_task)
else:
# 其他任务类型,提示用户
print(f" 提示:该任务类型为 {task_type},当前脚本可能未完全支持此类型的详细展示。")
print(f" 如需查看完整信息,请使用 --verbose 或 --json 参数。")
print("-" * 60)
print(f" RequestId: {result.get('RequestId', 'N/A')}")
# 详细模式:输出完整 JSON
if args.verbose:
print("\n完整响应:")
print(json.dumps(result, ensure_ascii=False, indent=2))
return result
except TencentCloudSDKException as e:
print(f"❌ 请求失败: {e}", file=sys.stderr)
sys.exit(1)
def main():
parser = argparse.ArgumentParser(
description="腾讯云 MPS 媒体处理任务查询 —— 查询 ProcessMedia 提交的任务状态和结果",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
# 查询指定任务
python mps_get_video_task.py --task-id 235303-WorkflowTask-80108cc3380155d98b2e3573a48a
# 查询并输出完整 JSON 响应
python mps_get_video_task.py --task-id 235303-WorkflowTask-80108cc3380155d98b2e3573a48a --verbose
# 仅输出原始 JSON(方便管道处理)
python mps_get_video_task.py --task-id 235303-WorkflowTask-80108cc3380155d98b2e3573a48a --json
环境变量:
TENCENTCLOUD_SECRET_ID 腾讯云 SecretId
TENCENTCLOUD_SECRET_KEY 腾讯云 SecretKey
"""
)
parser.add_argument("--task-id", type=str, required=True,
help="媒体处理任务 ID,由 ProcessMedia 接口返回")
parser.add_argument("--region", type=str,
help="MPS 服务区域(默认 ap-guangzhou)")
parser.add_argument("--verbose", "-v", action="store_true",
help="输出完整 JSON 响应")
parser.add_argument("--json", action="store_true",
help="仅输出原始 JSON,不打印格式化摘要")
args = parser.parse_args()
print("=" * 60)
print("腾讯云 MPS 媒体处理任务查询")
print("=" * 60)
print(f"TaskId: {args.task_id}")
print("-" * 60)
query_task(args)
if __name__ == "__main__":
main()