文件预览

console_automator.py

查看 Alibabacloud Safety Checker 技能包中的文件内容。

文件内容

scripts/console_automator.py

#!/usr/bin/env python3
"""
控制台浏览器自动化模块
功能:登录引导 → 导航到规则配置 → 复制Service → 配置标签开关 → 保存
使用 agent-browser 驱动,通过 subprocess 调用
"""
import argparse
import json
import os
import subprocess
import sys
import time
from datetime import datetime


class ConsoleAutomator:
    """内容安全控制台自动化"""

    SESSION = "contentsec"
    STATE_FILE = os.path.expanduser("~/.qoderwork/contentsec_console_state.json")

    # 控制台URL
    LOGIN_URL = "https://signin.aliyun.com"
    CONSOLE_URL = "https://yundun.console.aliyun.com/?p=contentsec#/green/textEnhanced"

    # 标签开关的CSS选择器模式(需要在实际控制台中确认)
    # 每个标签在规则编辑页是一个开关控件

    def __init__(self, headless: bool = False):
        self.headless = headless
        self.mode_flag = "--headless" if headless else "--headed"

    def _run(self, *args, timeout: int = 30) -> subprocess.CompletedProcess:
        """执行 agent-browser 命令"""
        cmd = ["agent-browser", "--session", self.SESSION, self.mode_flag, *args]
        return subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)

    def _run_sync(self, *args, timeout: int = 30) -> subprocess.CompletedProcess:
        """执行 agent-browser 命令(不带headed/headless,用于非open命令)"""
        cmd = ["agent-browser", "--session", self.SESSION, *args]
        return subprocess.run(cmd, capture_output=True, text=True, timeout=timeout)

    def launch_and_login(self):
        """启动浏览器并引导用户登录"""
        print("\n" + "=" * 60)
        print("正在打开阿里云登录页面...")
        print("=" * 60)

        # 先清理可能的旧会话
        self._run_sync("close")
        time.sleep(1)

        # 打开登录页
        result = self._run("open", self.LOGIN_URL)
        if result.returncode != 0:
            print(f"[ERROR] 浏览器启动失败: {result.stderr}")
            print("请确认已安装 agent-browser: agent-browser install")
            sys.exit(1)

        print("\n浏览器已打开,请在浏览器中完成登录(扫码/密码/短信)。")
        print("登录成功后,按 Enter 继续...")
        input()

        # 保存登录状态
        self._run_sync("state", "save", self.STATE_FILE)
        print(f"✓ 登录状态已保存到 {self.STATE_FILE}")
        print("  下次使用时可直接加载状态,无需重新登录")

        # 验证是否已登录
        self._run_sync("get", "title")
        self._run_sync("snapshot", "-i")
        return True

    def restore_session(self):
        """加载已保存的登录状态"""
        if not os.path.exists(self.STATE_FILE):
            return False

        self._run_sync("close")
        time.sleep(1)
        self._run_sync("state", "load", self.STATE_FILE)

        # 打开控制台验证
        result = self._run("open", self.CONSOLE_URL)
        if result.returncode != 0:
            return False

        time.sleep(2)
        result = self._run_sync("get", "title")
        return result.returncode == 0

    def navigate_to_text_rules(self):
        """导航到文本审核增强版规则配置页"""
        print("\n正在导航到文本审核规则配置...")
        self._run("open", self.CONSOLE_URL)
        self._run_sync("wait", "--load", "networkidle")
        time.sleep(2)

        # 获取页面快照
        result = self._run_sync("snapshot", "-i")
        print(result.stdout)
        return result.stdout

    def copy_service(self, base_service: str, new_service_name: str):
        """复制基础Service创建自定义Service"""
        print(f"\n正在复制 Service: {base_service} → {new_service_name}")

        # 先尝试快照
        result = self._run_sync("snapshot", "-i")
        snapshot = result.stdout

        # 查找目标 service 的复制按钮
        # 在控制台,每行 service 有一个操作列,包含"复制"按钮
        # 需要根据实际页面结构调整选择器

        # 方式1: 通过文本定位
        result = self._run_sync("find", "text", base_service, "snapshot", "-i")
        if result.returncode == 0:
            print(f"✓ 找到 Service: {base_service}")
        else:
            print(f"[WARN] 未通过文本找到 {base_service},尝试其他方式...")
            print("当前页面内容:")
            print(snapshot[:2000])
            return False

        # 查找复制按钮(通常在同一行的操作列)
        # 使用 find role button with name "复制"
        result = self._run_sync("find", "role", "button", "click", "--name", "复制")

        if result.returncode != 0:
            # 备用方案:使用文本查找 "复制"
            result = self._run_sync("find", "text", "复制", "click")

        if result.returncode != 0:
            print(f"[WARN] 未找到复制按钮,可能需要手动操作")
            return False

        # 等待对话框出现
        self._run_sync("wait", "2000")

        # 在对话框中输入新 Service 名称
        result = self._run_sync("snapshot", "-i")
        print(f"复制对话框内容:\n{result.stdout[:2000]}")

        # 填充服务名称
        result = self._run_sync("find", "placeholder", "服务名称", "type", new_service_name)
        if result.returncode != 0:
            result = self._run_sync("find", "label", "服务名称", "type", new_service_name)

        # 点击确定
        self._run_sync("find", "text", "确定", "click")

        # 等待创建完成
        self._run_sync("wait", "3000")

        print(f"✓ Service {new_service_name} 创建完成")
        return True

    def open_service_rules(self, service_name: str):
        """打开指定Service的规则配置页"""
        print(f"\n正在打开 {service_name} 的规则配置...")

        # 在列表中找到目标 Service,点击"设置规则"
        result = self._run_sync("find", "text", service_name, "snapshot", "-i")

        # 查找并点击"设置规则"按钮
        result = self._run_sync("find", "text", "设置规则", "click")
        if result.returncode != 0:
            result = self._run_sync("find", "text", "管理检测规则", "click")

        self._run_sync("wait", "--load", "networkidle")
        time.sleep(3)

        # 获取当前页面
        result = self._run_sync("get", "url")
        print(f"当前URL: {result.stdout}")

        return True

    def enter_edit_mode(self):
        """进入编辑模式"""
        print("\n进入编辑模式...")

        # 点击"编辑"按钮
        result = self._run_sync("find", "text", "编辑", "click")
        if result.returncode != 0:
            print("[WARN] 未找到编辑按钮,可能已在编辑模式")

        time.sleep(2)
        result = self._run_sync("snapshot", "-i")
        print(f"规则编辑页快照:\n{result.stdout[:5000]}")
        return True

    def toggle_label(self, label_key: str, enable: bool):
        """
        切换指定标签的开关
        label_key: 标签key,如 "pornographic_adult"
        enable: True=开启,False=关闭
        """
        # 在规则编辑页,每个标签对应一个开关控件
        # 需要根据实际页面结构适配

        # 方案1: 通过标签名旁边的开关定位
        # 通常开关是 switch / toggle 组件

        # 先找标签文本,然后找其相邻的开关
        result = self._run_sync("find", "text", label_key, "snapshot", "-i")

        if result.returncode == 0:
            # 找到标签文本,查找相邻开关
            # 这需要根据实际DOM结构调整
            pass

        # 备用方案:使用 switch 组件的通用模式
        action = "check" if enable else "uncheck"

        # 尝试通过label关联定位switch
        result = self._run_sync("find", "label", label_key, action)
        if result.returncode != 0:
            # 尝试role方式
            result = self._run_sync("find", "role", "switch", action, "--name", label_key)

        return result.returncode == 0

    def configure_labels(self, label_config: dict):
        """
        批量配置标签开关
        label_config: {label_key: LabelSwitch} 配置字典
        """
        print(f"\n开始配置 {len(label_config)} 个标签...")

        from scenarios import LabelSwitch
        success_count = 0
        fail_count = 0

        for label_key, switch in label_config.items():
            if switch == LabelSwitch.OFF:
                enable = False
            else:
                enable = True

            status = "开启" if enable else "关闭"
            print(f"  [{status}] {label_key} ... ", end="", flush=True)

            if self.toggle_label(label_key, enable):
                print("✓")
                success_count += 1
            else:
                print("✗ (可能需要手动配置)")
                fail_count += 1

            time.sleep(0.5)  # 避免操作过快

        print(f"\n配置完成: 成功 {success_count}, 失败 {fail_count}")
        return success_count, fail_count

    def save_rules(self):
        """保存规则配置"""
        print("\n正在保存规则配置...")

        result = self._run_sync("find", "text", "保存", "click")
        if result.returncode != 0:
            result = self._run_sync("find", "role", "button", "click", "--name", "保存")

        self._run_sync("wait", "3000")

        print("✓ 规则已保存,等待 2~5 分钟生效...")
        print("  提示:生效后即可在API调用中使用新的service参数")

    def run_full_workflow(self, scenario_id: str, label_config: dict,
                          base_service: str, custom_service: str):
        """
        完整工作流:登录 → 复制Service → 配置标签 → 保存
        """
        from scenarios import get_scenario

        scenario = get_scenario(scenario_id)
        if not scenario:
            print(f"[ERROR] 未找到场景: {scenario_id}")
            return False

        print(f"\n{'=' * 60}")
        print(f"  内容安全控制台配置 - {scenario.name}")
        print(f"{'=' * 60}")
        print(f"  基础Service: {base_service}")
        print(f"  自定义Service: {custom_service}")
        print(f"  标签数量: {len(label_config)}")
        print(f"{'=' * 60}\n")

        # Step 1: 确保已登录
        if not self.restore_session():
            print("未找到有效的登录状态,引导用户登录...")
            self.launch_and_login()
        else:
            print("✓ 已恢复登录状态")

        # Step 2: 导航到规则配置
        self.navigate_to_text_rules()
        time.sleep(2)

        # Step 3: 复制Service
        self.copy_service(base_service, custom_service)
        time.sleep(2)

        # Step 4: 打开新Service的规则配置
        self.open_service_rules(custom_service)
        time.sleep(1)

        # Step 5: 进入编辑模式
        self.enter_edit_mode()
        time.sleep(1)

        # Step 6: 配置标签
        success, fail = self.configure_labels(label_config)
        time.sleep(1)

        # Step 7: 保存
        self.save_rules()

        print(f"\n{'=' * 60}")
        print(f"  控制台配置完成!")
        print(f"  Service: {custom_service}")
        print(f"  状态: 成功 {success} / 失败 {fail}")
        print(f"  请等待 2~5 分钟后在API调用中使用")
        print(f"{'=' * 60}\n")

        return fail == 0


def main():
    parser = argparse.ArgumentParser(description="内容安全控制台自动化配置")
    parser.add_argument("--scenario", "-s", required=True, help="场景ID")
    parser.add_argument("--headless", action="store_true", help="使用无头模式")
    parser.add_argument("--new-login", action="store_true", help="强制重新登录")
    args = parser.parse_args()

    from scenarios import get_scenario

    scenario = get_scenario(args.scenario)
    if not scenario:
        print(f"[ERROR] 未找到场景: {args.scenario}")
        print("可用场景: " + ", ".join(s.id for s in __import__('scenarios').list_scenarios()))
        sys.exit(1)

    custom_service = f"{scenario.base_service}_{scenario.recommended_service_suffix}"

    automator = ConsoleAutomator(headless=args.headless)

    if args.new_login:
        automator.launch_and_login()
        print("登录完成,请重新运行配置命令")
        return

    automator.run_full_workflow(
        scenario_id=args.scenario,
        label_config=scenario.label_config,
        base_service=scenario.base_service,
        custom_service=custom_service,
    )


if __name__ == "__main__":
    import os; os.chdir(os.path.dirname(os.path.abspath(__file__)))
    main()