文件预览

cst.py

查看 Alibabacloud Safety Checker 技能包中的文件内容。

文件内容

scripts/cst.py

#!/usr/bin/env python3
"""
内容安全自动化测试 - 一站式入口脚本
整合:场景选择 → 凭证验证 → 控制台配置 → 测试运行 → 标注 → 报告

用法:
  python cst.py init              # 初始化:验证凭证链
  python cst.py configure <场景>   # 控制台配置标签开关
  python cst.py test <样本>        # 运行审核测试
  python cst.py annotate [文件]    # 人工标注
  python cst.py report [文件]      # 生成对齐报告
  python cst.py guardrail         # AI安全护栏效果测试
  python cst.py status            # 查看当前状态
  python cst.py full <场景> <样本>  # 完整流程:配置→测试→报告
"""
import argparse
import json
import os
import sys
import subprocess
from datetime import datetime

SKILL_DIR = os.path.dirname(os.path.abspath(__file__))
RESULTS_DIR = os.path.join(SKILL_DIR, "results")
SAMPLES_DIR = os.path.join(SKILL_DIR, "samples")
STATE_FILE = os.path.expanduser("~/.qoderwork/contentsec_console_state.json")


def ensure_results_dir():
    os.makedirs(RESULTS_DIR, exist_ok=True)


def run_python(script: str, *args, capture: bool = False) -> int:
    """运行 Python 脚本"""
    cmd = [sys.executable, os.path.join(SKILL_DIR, script)] + list(args)
    result = subprocess.run(cmd, cwd=SKILL_DIR, capture_output=capture, text=True)
    if not capture:
        print(result.stdout, end="")
        if result.stderr:
            print(result.stderr, end="")
    return result.returncode


def cmd_init(args):
    """初始化:验证凭证链连通性"""
    print("=" * 60)
    print("  内容安全自动化测试 - 初始化")
    print("=" * 60)

    print("\n正在通过默认凭证链验证连通性...\n")

    rc = run_python("verify_auth.py")
    if rc == 0:
        # 保存配置状态
        state = {
            "initialized_at": datetime.now().isoformat(),
            "verified": True,
        }
        state_path = os.path.join(RESULTS_DIR, "cst_state.json")
        with open(state_path, "w") as f:
            json.dump(state, f, indent=2)
        print(f"\n✓ 初始化完成,状态已保存")
    else:
        print(f"\n✗ 验证失败,请检查凭证链配置和权限设置")
        sys.exit(1)


def cmd_configure(args):
    """控制台配置:浏览器自动化"""
    from scenarios import get_scenario, list_scenarios

    scenario = get_scenario(args.scenario)
    if not scenario:
        print(f"[ERROR] 未找到场景: {args.scenario}")
        print("\n可用场景:")
        for s in list_scenarios():
            print(f"  {s.id:30s} {s.name}")
        sys.exit(1)

    custom_service = f"{scenario.base_service}_{scenario.recommended_service_suffix}"

    print("=" * 60)
    print(f"  控制台配置 - {scenario.name}")
    print("=" * 60)
    print(f"  基础Service:  {scenario.base_service}")
    print(f"  自定义Service: {custom_service}")
    print(f"  标签数量:    {len(scenario.label_config)}")
    print("=" * 60)

    # 展示配置矩阵
    run_python("scenario_manager.py", "show", args.scenario)

    if not args.yes:
        confirm = input("\n是否继续执行控制台配置?(y/N) ")
        if confirm.lower() != "y":
            print("已取消")
            return

    # 执行浏览器自动化
    headless_flag = "--headless" if args.headless else ""
    cmd = [
        sys.executable,
        os.path.join(SKILL_DIR, "console_automator.py"),
        "--scenario", args.scenario,
    ]
    if args.headless:
        cmd.append("--headless")
    if args.new_login:
        cmd.append("--new-login")

    subprocess.run(cmd, cwd=SKILL_DIR)


def cmd_test(args):
    """运行审核测试"""
    sample_path = args.samples

    if not os.path.exists(sample_path):
        print(f"[ERROR] 样本文件不存在: {sample_path}")
        sys.exit(1)

    # 构建测试命令
    cmd = [
        sys.executable, os.path.join(SKILL_DIR, "main.py"),
        "run", "--samples", sample_path,
        "--concurrent", str(args.concurrent or 10),
    ]

    if args.services:
        cmd.extend(["--text-services", args.services])
    if args.format:
        cmd.extend(["--format", args.format])

    subprocess.run(cmd, cwd=SKILL_DIR)


def cmd_annotate(args):
    """人工标注"""
    results_file = args.file
    if not results_file:
        # 自动找到最新的结果文件
        latest = _find_latest_results()
        if not latest:
            print("[ERROR] 未找到测试结果文件")
            sys.exit(1)
        results_file = latest
        print(f"使用最新结果文件: {results_file}")

    cmd = [
        sys.executable, os.path.join(SKILL_DIR, "main.py"),
        "annotate", "--file", results_file,
    ]
    if args.annotator:
        cmd.extend(["--annotator", args.annotator])

    subprocess.run(cmd, cwd=SKILL_DIR)


def cmd_report(args):
    """生成对齐分析报告"""
    results_file = args.file
    if not results_file:
        latest = _find_latest_annotated()
        if not latest:
            print("[ERROR] 未找到已标注的结果文件")
            sys.exit(1)
        results_file = latest
        print(f"使用最新已标注文件: {results_file}")

    cmd = [
        sys.executable, os.path.join(SKILL_DIR, "main.py"),
        "report", "--file", results_file,
    ]
    if args.output:
        cmd.extend(["--output", args.output])

    subprocess.run(cmd, cwd=SKILL_DIR)


def cmd_status(args):
    """查看当前状态"""
    from scenarios import list_scenarios

    ensure_results_dir()
    state_path = os.path.join(RESULTS_DIR, "cst_state.json")

    print("=" * 60)
    print("  内容安全自动化测试 - 状态")
    print("=" * 60)

    # 凭证链状态
    try:
        from alibabacloud_credentials.client import Client as CredClient
        cred = CredClient()
        print(f"\n  凭证链: ✓ 已配置")
    except Exception:
        print(f"\n  凭证链: ✗ 未配置")

    # 初始化状态
    if os.path.exists(state_path):
        with open(state_path) as f:
            state = json.load(f)
        print(f"  初始化时间: {state.get('initialized_at', 'N/A')}")
        print(f"  验证状态: {'✓' if state.get('verified') else '✗'}")
    else:
        print(f"  初始化时间: 未初始化")
        print(f"  运行 python cst.py init 进行初始化")

    # 控制台登录状态
    if os.path.exists(STATE_FILE):
        print(f"  控制台登录: ✓ 有保存的会话")
    else:
        print(f"  控制台登录: 无保存的会话(首次需要手动登录)")

    # 可用场景
    print(f"\n  可用场景 ({len(list_scenarios())}):")
    for s in list_scenarios():
        print(f"    {s.id:30s} {s.name}")

    # 最近的结果文件
    if os.path.exists(RESULTS_DIR):
        files = sorted(os.listdir(RESULTS_DIR), reverse=True)[:5]
        if files:
            print(f"\n  最近的结果文件:")
            for f in files:
                full_path = os.path.join(RESULTS_DIR, f)
                size = os.path.getsize(full_path)
                size_str = f"{size/1024:.1f}KB" if size > 1024 else f"{size}B"
                print(f"    {f} ({size_str})")


def cmd_guardrail(args):
    """AI安全护栏效果测试(query_security_check / response_security_check)"""
    print("=" * 60)
    print("  AI安全护栏效果测试")
    print("=" * 60)

    cmd = [sys.executable, os.path.join(SKILL_DIR, "ai_guardrail.py")]
    if args.service:
        cmd.extend(["--service", args.service])
    if args.samples:
        cmd.extend(["--samples", args.samples])
    if args.output:
        cmd.extend(["--output", args.output])
    if getattr(args, "format", None):
        cmd.extend(["--format", args.format])

    subprocess.run(cmd, cwd=SKILL_DIR)


def cmd_full(args):
    """完整流程:控制台配置 → 测试 → 标注 → 报告"""
    print("\n" + "=" * 60)
    print("  完整工作流:配置 → 测试 → 标注 → 报告")
    print("=" * 60 + "\n")

    scenario = args.scenario
    samples = args.samples

    # Step 1: 控制台配置
    print(">>> Step 1/3: 控制台配置标签开关")
    configure_cmd = [
        sys.executable, os.path.join(SKILL_DIR, "console_automator.py"),
        "--scenario", scenario,
    ]
    rc = subprocess.run(configure_cmd, cwd=SKILL_DIR).returncode
    if rc != 0 and not args.skip_configure:
        print("控制台配置失败,是否继续测试?(y/N)")
        if input().lower() != "y":
            return

    # Step 2: 运行测试
    print("\n>>> Step 2/3: 运行审核测试")
    test_cmd = [
        sys.executable, os.path.join(SKILL_DIR, "main.py"),
        "run", "--samples", samples,
    ]
    custom_service = f"{args.base_service}_{scenario}"
    test_cmd.extend(["--text-services", custom_service])
    subprocess.run(test_cmd, cwd=SKILL_DIR)

    # Step 3: 标注(可选)
    print("\n>>> Step 3/3: 人工标注(可选)")
    if not args.skip_annotate:
        latest = _find_latest_results()
        if latest:
            print(f"找到最新结果: {latest}")
            annotate_cmd = [
                sys.executable, os.path.join(SKILL_DIR, "main.py"),
                "annotate", "--file", latest,
            ]
            if args.annotator:
                annotate_cmd.extend(["--annotator", args.annotator])
            subprocess.run(annotate_cmd, cwd=SKILL_DIR)


def _find_latest_results(pattern: str = "results_batch_*.json") -> str:
    """找到最新的结果文件"""
    import glob
    files = glob.glob(os.path.join(RESULTS_DIR, "results_batch_*.json"))
    if not files:
        # 尝试匹配任意JSON
        files = glob.glob(os.path.join(RESULTS_DIR, "results_*.json"))
    return sorted(files, reverse=True)[0] if files else ""


def _find_latest_annotated() -> str:
    """找到最新的已标注文件"""
    import glob
    files = glob.glob(os.path.join(RESULTS_DIR, "annotated_*.xlsx"))
    if not files:
        files = glob.glob(os.path.join(RESULTS_DIR, "annotated_*.json"))
    if not files:
        files = glob.glob(os.path.join(RESULTS_DIR, "annotated_*.csv"))
    return sorted(files, reverse=True)[0] if files else ""


def main():
    os.chdir(SKILL_DIR)  # 确保始终在skill目录中运行

    parser = argparse.ArgumentParser(
        description="内容安全自动化测试 - 一站式入口",
        formatter_class=argparse.RawDescriptionHelpFormatter,
    )
    subparsers = parser.add_subparsers(dest="command")

    # init
    subparsers.add_parser("init", help="初始化:验证凭证链连通性")

    # configure
    conf = subparsers.add_parser("configure", help="控制台配置标签开关")
    conf.add_argument("scenario", help="场景ID")
    conf.add_argument("--yes", "-y", action="store_true", help="跳过确认")
    conf.add_argument("--headless", action="store_true", help="无头模式")
    conf.add_argument("--new-login", action="store_true", help="强制重新登录")

    # test
    test = subparsers.add_parser("test", help="运行审核测试")
    test.add_argument("samples", help="样本文件路径")
    test.add_argument("--services", help="指定服务列表,逗号分隔")
    test.add_argument("--concurrent", "-c", type=int, default=10, help="并发数")
    test.add_argument("--format", "-f", choices=["xlsx", "csv", "json"])

    # annotate
    ann = subparsers.add_parser("annotate", help="人工标注")
    ann.add_argument("--file", "-f", help="结果文件(默认使用最新)")
    ann.add_argument("--annotator", "-a", help="标注人姓名")

    # report
    rep = subparsers.add_parser("report", help="生成对齐分析报告")
    rep.add_argument("--file", "-f", help="已标注文件(默认使用最新)")
    rep.add_argument("--output", "-o", help="输出路径")

    # status
    subparsers.add_parser("status", help="查看当前状态")

    # guardrail
    guard = subparsers.add_parser("guardrail", help="AI安全护栏效果测试")
    guard.add_argument("--service", "-s", choices=["query", "response", "all"],
                       default="all", help="测试服务: query=输入检测, response=生成检测, all=全部")
    guard.add_argument("--samples", help="自定义样本JSON文件路径")
    guard.add_argument("--output", "-o", help="结果输出文件路径")
    guard.add_argument("--format", "-f", choices=["xlsx", "json"], default=None,
                       help="输出格式 (默认json,可选xlsx)")

    # full
    full = subparsers.add_parser("full", help="完整流程")
    full.add_argument("scenario", help="场景ID")
    full.add_argument("samples", help="样本文件路径")
    full.add_argument("--base-service", default="ugc_moderation_byllm",
                      help="基础Service名")
    full.add_argument("--annotator", "-a", help="标注人姓名")
    full.add_argument("--skip-configure", action="store_true", help="跳过控制台配置")
    full.add_argument("--skip-annotate", action="store_true", help="跳过标注")

    args = parser.parse_args()

    if not args.command:
        parser.print_help()
        return

    cmd_map = {
        "init": cmd_init,
        "configure": cmd_configure,
        "test": cmd_test,
        "annotate": cmd_annotate,
        "report": cmd_report,
        "status": cmd_status,
        "guardrail": cmd_guardrail,
        "full": cmd_full,
    }

    cmd_map[args.command](args)


if __name__ == "__main__":
    main()