文件预览

client.py

查看 Alibabacloud Safety Checker 技能包中的文件内容。

文件内容

scripts/client.py

"""
API客户端 - 封装阿里云内容安全各模态审核接口
支持:文本审核增强版PLUS、图片审核增强版、语音审核增强版、视频审核增强版
"""
import json
import time
import logging
from datetime import datetime
from typing import Dict, Optional, Tuple

from alibabacloud_green20220302.client import Client
from alibabacloud_green20220302 import models
from alibabacloud_tea_openapi.models import Config
from alibabacloud_tea_util.models import RuntimeOptions
from alibabacloud_credentials.client import Client as CredClient

from config import (
    ENDPOINT,
    REGION_ID,
)
from models import ModalityType, ModerationResult

logger = logging.getLogger(__name__)


class ContentModerationClient:
    """内容安全审核客户端 - 统一封装多模态审核能力"""

    def __init__(self):
        self._client = self._create_client()
        self._runtime = RuntimeOptions()

    def _create_client(self) -> Client:
        """创建阿里云API客户端(使用默认凭证链)"""
        cred = CredClient()
        config = Config(
            credential=cred,
            region_id=REGION_ID,
            endpoint=ENDPOINT,
            user_agent="AlibabaCloud-Agent-Skills/alibabacloud-safety-checker",
            read_timeout=30,
            connect_timeout=10,
        )
        return Client(config)

    # ==================== 文本审核 ====================

    def moderate_text(
        self, content: str, service: str, data_id: str = ""
    ) -> ModerationResult:
        """
        文本审核增强版PLUS
        - service: ugc_moderation_byllm / ugc_moderation_byllm_pro /
                   aigc_moderation_byllm / ugc_moderation_byllm_cb
        """
        sample_id = data_id or f"text_{int(time.time() * 1000)}"
        request_time = datetime.now().isoformat()
        start_ts = time.time()

        service_parameters = {"content": content}
        if data_id:
            service_parameters["dataId"] = data_id

        request = models.TextModerationPlusRequest(
            service=service,
            service_parameters=json.dumps(service_parameters),
        )

        result = ModerationResult(
            sample_id=sample_id,
            service=service,
            request_id="",
            modality=ModalityType.TEXT,
            request_time=request_time,
        )

        try:
            response = self._client.text_moderation_plus_with_options(
                request, self._runtime
            )
            elapsed_ms = (time.time() - start_ts) * 1000
            result.response_time = datetime.now().isoformat()
            result.latency_ms = elapsed_ms

            if response.status_code == 200:
                body = response.body
                result.request_id = body.request_id or ""

                if body.code == 200:
                    result.success = True
                    data = body.data
                    if data:
                        result.risk_level = data.risk_level or "none"
                        if data.result:
                            for item in data.result:
                                if item.label:
                                    result.labels.append(item.label)
                                if item.confidence is not None:
                                    result.confidence[item.label] = item.confidence
                                if item.description:
                                    result.reason = item.description
                        result.raw_response = self._body_to_dict(body)
                else:
                    result.success = False
                    result.error_code = str(body.code)
                    result.error_message = body.message or ""
                    result.request_id = body.request_id or ""
            else:
                result.success = False
                result.error_code = str(response.status_code)
                result.error_message = "HTTP请求失败"

        except Exception as e:
            result.success = False
            result.error_message = str(e)
            result.response_time = datetime.now().isoformat()
            result.latency_ms = (time.time() - start_ts) * 1000
            logger.error(f"文本审核异常 [service={service}]: {e}")

        return result

    # ==================== 图片审核 ====================

    def moderate_image(
        self, image_url: str, service: str, data_id: str = ""
    ) -> ModerationResult:
        """
        图片审核增强版
        - service: baselineCheck / baselineCheck_pro / baselineCheckByVL /
                   aigcCheck / profilePhotoCheck / postImageCheck 等
        """
        sample_id = data_id or f"image_{int(time.time() * 1000)}"
        request_time = datetime.now().isoformat()
        start_ts = time.time()

        service_parameters = {"imageUrl": image_url}
        if data_id:
            service_parameters["dataId"] = data_id

        request = models.ImageModerationRequest(
            service=service,
            service_parameters=json.dumps(service_parameters),
        )

        result = ModerationResult(
            sample_id=sample_id,
            service=service,
            request_id="",
            modality=ModalityType.IMAGE,
            request_time=request_time,
        )

        try:
            response = self._client.image_moderation_with_options(
                request, self._runtime
            )
            elapsed_ms = (time.time() - start_ts) * 1000
            result.response_time = datetime.now().isoformat()
            result.latency_ms = elapsed_ms

            if response.status_code == 200:
                body = response.body
                result.request_id = body.request_id or ""

                if body.code == 200:
                    result.success = True
                    data = body.data
                    if data:
                        result.risk_level = data.risk_level or "none"
                        if data.result:
                            for item in data.result:
                                if item.label:
                                    result.labels.append(item.label)
                                if item.confidence is not None:
                                    result.confidence[item.label] = item.confidence
                                if item.description:
                                    result.reason = item.description
                        result.raw_response = self._body_to_dict(body)
                else:
                    result.success = False
                    result.error_code = str(body.code)
                    result.error_message = body.msg or ""
                    result.request_id = body.request_id or ""
            else:
                result.success = False
                result.error_code = str(response.status_code)
                result.error_message = "HTTP请求失败"

        except Exception as e:
            result.success = False
            result.error_message = str(e)
            result.response_time = datetime.now().isoformat()
            result.latency_ms = (time.time() - start_ts) * 1000
            logger.error(f"图片审核异常 [service={service}]: {e}")

        return result

    # ==================== 音频审核(异步) ====================

    def moderate_audio(
        self, audio_url: str, service: str = "audio_media_detection", data_id: str = ""
    ) -> ModerationResult:
        """
        音频审核增强版(异步接口:提交 → 轮询结果)
        """
        sample_id = data_id or f"audio_{int(time.time() * 1000)}"
        request_time = datetime.now().isoformat()
        start_ts = time.time()

        service_parameters = {"url": audio_url}
        if data_id:
            service_parameters["dataId"] = data_id

        request = models.VoiceModerationRequest(
            service=service,
            service_parameters=json.dumps(service_parameters),
        )

        result = ModerationResult(
            sample_id=sample_id,
            service=service,
            request_id="",
            modality=ModalityType.AUDIO,
            request_time=request_time,
            is_async=True,
        )

        try:
            response = self._client.voice_moderation_with_options(
                request, self._runtime
            )
            elapsed_ms = (time.time() - start_ts) * 1000
            result.latency_ms = elapsed_ms

            if response.status_code == 200:
                body = response.body
                result.request_id = body.request_id or ""

                if body.code == 200:
                    result.success = True
                    data = body.data
                    if data and data.task_id:
                        result.task_id = data.task_id
                    result.raw_response = self._body_to_dict(body)
                else:
                    result.success = False
                    result.error_code = str(body.code)
                    result.error_message = body.message or ""
            else:
                result.success = False
                result.error_code = str(response.status_code)
                result.error_message = "HTTP请求失败"

        except Exception as e:
            result.success = False
            result.error_message = str(e)
            logger.error(f"音频审核提交异常 [service={service}]: {e}")

        result.response_time = datetime.now().isoformat()
        return result

    def poll_audio_result(
        self, task_id: str, service: str = "audio_media_detection"
    ) -> Optional[Dict]:
        """轮询音频审核结果"""
        service_parameters = {"taskId": task_id}
        request = models.VoiceModerationResultRequest(
            service=service,
            service_parameters=json.dumps(service_parameters),
        )

        try:
            response = self._client.voice_moderation_result_with_options(
                request, self._runtime
            )
            if response.status_code == 200:
                body = response.body
                if body.code == 200 and body.data:
                    data = body.data
                    # 检查任务是否完成
                    raw = self._body_to_dict(body)
                    return raw
            return None
        except Exception as e:
            logger.error(f"音频结果查询异常 [task_id={task_id}]: {e}")
            return None

    # ==================== 视频审核(异步) ====================

    def moderate_video(
        self, video_url: str, service: str = "videoDetection", data_id: str = ""
    ) -> ModerationResult:
        """
        视频审核增强版(异步接口:提交 → 轮询结果)
        """
        sample_id = data_id or f"video_{int(time.time() * 1000)}"
        request_time = datetime.now().isoformat()
        start_ts = time.time()

        service_parameters = {"url": video_url}
        if data_id:
            service_parameters["dataId"] = data_id

        request = models.VideoModerationRequest(
            service=service,
            service_parameters=json.dumps(service_parameters),
        )

        result = ModerationResult(
            sample_id=sample_id,
            service=service,
            request_id="",
            modality=ModalityType.VIDEO,
            request_time=request_time,
            is_async=True,
        )

        try:
            response = self._client.video_moderation_with_options(
                request, self._runtime
            )
            elapsed_ms = (time.time() - start_ts) * 1000
            result.latency_ms = elapsed_ms

            if response.status_code == 200:
                body = response.body
                result.request_id = body.request_id or ""

                if body.code == 200:
                    result.success = True
                    data = body.data
                    if data and data.task_id:
                        result.task_id = data.task_id
                    result.raw_response = self._body_to_dict(body)
                else:
                    result.success = False
                    result.error_code = str(body.code)
                    result.error_message = body.message or ""
            else:
                result.success = False
                result.error_code = str(response.status_code)
                result.error_message = "HTTP请求失败"

        except Exception as e:
            result.success = False
            result.error_message = str(e)
            logger.error(f"视频审核提交异常 [service={service}]: {e}")

        result.response_time = datetime.now().isoformat()
        return result

    def poll_video_result(
        self, task_id: str, service: str = "videoDetection"
    ) -> Optional[Dict]:
        """轮询视频审核结果"""
        service_parameters = {"taskId": task_id}
        request = models.VideoModerationResultRequest(
            service=service,
            service_parameters=json.dumps(service_parameters),
        )

        try:
            response = self._client.video_moderation_result_with_options(
                request, self._runtime
            )
            if response.status_code == 200:
                body = response.body
                if body.code == 200 and body.data:
                    return self._body_to_dict(body)
            return None
        except Exception as e:
            logger.error(f"视频结果查询异常 [task_id={task_id}]: {e}")
            return None

    # ==================== 辅助方法 ====================

    @staticmethod
    def _body_to_dict(body) -> Dict:
        """将SDK响应body对象转换为字典(尽可能保留原始数据)"""
        try:
            if hasattr(body, "to_map"):
                return body.to_map()
            return {"raw": str(body)}
        except Exception:
            return {"raw": str(body)}