文件预览

security_review.py

查看 Skill Doc Formatter 技能包中的文件内容。

文件内容

scripts/security_review.py

#!/usr/bin/env python3
"""
Security review checker for OpenClaw skills.
Scans skills for common security issues that affect ClawHub security reviews.
"""

import argparse
import json
import re
import sys
from pathlib import Path
from typing import Dict, List, Tuple


class SecurityReviewer:
    def __init__(self, skill_dir: Path):
        self.skill_dir = Path(skill_dir)
        self.issues = []
        self.warnings = []
        self.suggestions = []

    def check_requirements_declaration(self):
        """Check if runtime dependencies are declared in SKILL.md or _meta.json."""
        skill_md = self.skill_dir / "SKILL.md"
        meta_json = self.skill_dir / "_meta.json"
        
        # Read SKILL.md
        skill_content = ""
        if skill_md.exists():
            skill_content = skill_md.read_text(encoding="utf-8")
        
        # Read _meta.json
        meta = {}
        if meta_json.exists():
            try:
                meta = json.loads(meta_json.read_text(encoding="utf-8"))
            except:
                pass
        
        # Check for common CLI/system dependencies in scripts
        scripts_dir = self.skill_dir / "scripts"
        found_deps = set()
        
        if scripts_dir.exists():
            for script_file in scripts_dir.glob("*.py"):
                content = script_file.read_text(encoding="utf-8")
                # Check for subprocess calls to system tools
                if re.search(r"subprocess\.(run|call|Popen|check_output)", content):
                    # Look for common system tools in command arrays or strings
                    for tool in ["openclaw", "lsof", "ps", "launchctl", "curl", "wget", "git", "which", "kill", "killall"]:
                        # Check for tool in command arrays or subprocess calls
                        if re.search(rf"['\"]({tool})['\"]|\[.*['\"]({tool})['\"]", content, re.IGNORECASE):
                            found_deps.add(tool)
            
            # Also check shell scripts
            for script_file in scripts_dir.glob("*.sh"):
                content = script_file.read_text(encoding="utf-8")
                for tool in ["openclaw", "lsof", "ps", "launchctl", "curl", "wget", "git", "which", "kill", "killall"]:
                    if re.search(rf"\\b{tool}\\b", content, re.IGNORECASE):
                        found_deps.add(tool)
        
        # Check if Requirements section exists
        has_requirements = bool(re.search(r"##\s+Requirements?", skill_content, re.IGNORECASE))
        
        # Check if dependencies are mentioned in Requirements section
        deps_mentioned = False
        if has_requirements and found_deps:
            # Extract Requirements section content
            req_match = re.search(r"##\s+Requirements?.*?##", skill_content, re.IGNORECASE | re.DOTALL)
            if req_match:
                req_content = req_match.group(0).lower()
                deps_mentioned = all(dep.lower() in req_content for dep in found_deps)
        
        if found_deps:
            if not has_requirements:
                self.issues.append({
                    "type": "missing_requirements",
                    "severity": "medium",
                    "message": f"Skill uses system dependencies ({', '.join(sorted(found_deps))}) but no Requirements section found in SKILL.md",
                    "fix": "Add a ## Requirements section listing all CLI tools and system dependencies"
                })
            elif not deps_mentioned:
                self.warnings.append({
                    "type": "requirements_incomplete",
                    "severity": "low",
                    "message": f"Skill uses system dependencies ({', '.join(sorted(found_deps))}) but they're not all mentioned in Requirements section",
                    "fix": "Update Requirements section to list all CLI tools and system dependencies used by the skill"
                })

    def check_secret_logging(self):
        """Check for secrets being logged to files."""
        scripts_dir = self.skill_dir / "scripts"
        if not scripts_dir.exists():
            return
        
        for script_file in scripts_dir.glob("*.py"):
            content = script_file.read_text(encoding="utf-8")
            lines = content.split("\n")
            
            # Look for log file paths
            log_paths = []
            for i, line in enumerate(lines, 1):
                # Find log file paths
                match = re.search(r"['\"]([^'\"]+\.log)['\"]", line)
                if match:
                    log_paths.append((i, match.group(1)))
            
            # Check each log file usage
            for log_line_num, log_path in log_paths:
                # Look for writes to this log file
                for i, line in enumerate(lines, 1):
                    if log_path in line and ("write" in line.lower() or "logf" in line or "log_file" in line.lower() or "open" in line.lower()):
                        # Check if command with secrets is being written
                        # Look ahead and behind a few lines for command construction
                        context_start = max(0, i - 15)
                        context_end = min(len(lines), i + 10)
                        context = "\n".join(lines[context_start:context_end])
                        
                        # Pattern 1: Writing joined command array (e.g., ' '.join(cmd))
                        if re.search(r"['\"].*join\(.*cmd|' '.join\(.*cmd|\+.*cmd", context, re.IGNORECASE):
                            # Check if cmd contains secrets
                            if re.search(r"cmd.*--(token|password|auth)|--(token|password|auth).*cmd", context, re.IGNORECASE):
                                self.issues.append({
                                    "type": "secret_logging",
                                    "severity": "high",
                                    "message": f"{script_file.name}:{i} - Full command with secrets written to log file '{log_path}'",
                                    "fix": "Do not log full command strings containing secrets. Log only command name and masked arguments (e.g., log cmd[0] + '***' for args)"
                                })
                        
                        # Pattern 2: Direct write of command string with secrets
                        if re.search(r"write.*\(.*['\"].*--(token|password|auth)", line, re.IGNORECASE):
                            self.issues.append({
                                "type": "secret_logging",
                                "severity": "high",
                                "message": f"{script_file.name}:{i} - Command with secrets written to log file '{log_path}'",
                                "fix": "Do not log full command strings containing secrets. Log only command name and masked arguments"
                            })
                        
                        # Pattern 3: subprocess.Popen with stdout/stderr to log file and cmd contains secrets
                        if "subprocess" in context.lower() and ("Popen" in context or "run" in context):
                            # Check if cmd variable is passed and contains secrets
                            cmd_context = "\n".join(lines[max(0, i-20):min(len(lines), i+5)])
                            if re.search(r"cmd.*=.*\[.*--(token|password|auth)", cmd_context, re.IGNORECASE):
                                # Only flag if stdout/stderr are actually sent to the log (not DEVNULL)
                                if "devnull" in context.lower() and "stdout" in context.lower() and "stderr" in context.lower():
                                    continue  # Safe: output discarded
                                if "stdout" in line.lower() or "stderr" in line.lower() or log_path in line:
                                    self.issues.append({
                                        "type": "secret_logging",
                                        "severity": "high",
                                        "message": f"{script_file.name}:{i} - Command with secrets passed to subprocess with output to log file '{log_path}'",
                                        "fix": "Do not pass commands with secrets to subprocess when stdout/stderr goes to log files. Mask secrets in command or redirect output elsewhere"
                                    })
                        
                        # Pattern 4: Direct secret/token/password variable in write statement
                        if re.search(r"write.*\(.*(secret|token|password)", line, re.IGNORECASE):
                            self.issues.append({
                                "type": "secret_logging",
                                "severity": "high",
                                "message": f"{script_file.name}:{i} - Potential secret value written to log file '{log_path}'",
                                "fix": "Mask secrets before logging (e.g., log only hash or '***') or avoid logging command arguments containing secrets"
                            })

    def check_missing_files(self):
        """Check for referenced files that don't exist in the skill."""
        scripts_dir = self.skill_dir / "scripts"
        if not scripts_dir.exists():
            return
        
        for script_file in scripts_dir.glob("*.sh"):
            content = script_file.read_text(encoding="utf-8")
            
            # Look for file references
            for match in re.finditer(r"([\"'])([^\"']+\.(plist|json|py|sh))\\1", content):
                ref_file = match.group(2)
                # Check if it's a relative path
                if not ref_file.startswith("/") and not ref_file.startswith("$"):
                    full_path = self.skill_dir / ref_file
                    if not full_path.exists():
                        # Check if it's in scripts/
                        script_path = scripts_dir / ref_file
                        if not script_path.exists():
                            self.issues.append({
                                "type": "missing_file",
                                "severity": "medium",
                                "message": f"{script_file.name} references '{ref_file}' which doesn't exist",
                                "fix": f"Add the missing file or update the reference"
                            })

    def check_env_vars_declaration(self):
        """Check if environment variables are declared."""
        scripts_dir = self.skill_dir / "scripts"
        skill_md = self.skill_dir / "SKILL.md"
        
        if not scripts_dir.exists():
            return
        
        # Find env vars used in scripts
        found_env_vars = set()
        for script_file in scripts_dir.glob("*.py"):
            content = script_file.read_text(encoding="utf-8")
            for match in re.finditer(r"os\.environ\.get\(['\"]([^'\"]+)['\"]", content):
                found_env_vars.add(match.group(1))
            for match in re.finditer(r"os\.getenv\(['\"]([^'\"]+)['\"]", content):
                found_env_vars.add(match.group(1))
        
        if found_env_vars:
            doc_content = ""
            if skill_md.exists():
                doc_content += skill_md.read_text(encoding="utf-8") + "\n"
            readme = self.skill_dir / "README.md"
            if readme.exists():
                doc_content += readme.read_text(encoding="utf-8")
            # Check if env vars are mentioned in SKILL.md or README
            env_vars_mentioned = any(
                re.search(rf"\b{re.escape(var)}\b", doc_content, re.IGNORECASE)
                for var in found_env_vars
            )
            
            if not env_vars_mentioned:
                self.warnings.append({
                    "type": "env_vars_not_declared",
                    "severity": "low",
                    "message": f"Scripts use environment variables ({', '.join(sorted(found_env_vars))}) but they're not documented",
                    "fix": "Document required environment variables in SKILL.md Requirements section"
                })

    def check_persistent_behavior(self):
        """Check for LaunchAgent/daemon behavior without always:true."""
        scripts_dir = self.skill_dir / "scripts"
        meta_json = self.skill_dir / "_meta.json"
        
        if not scripts_dir.exists():
            return
        
        # Check for LaunchAgent install scripts
        has_launchagent = False
        for script_file in scripts_dir.glob("*.sh"):
            content = script_file.read_text(encoding="utf-8")
            if "launchctl" in content.lower() or "plist" in content.lower():
                has_launchagent = True
                break
        
        if has_launchagent:
            meta = {}
            if meta_json.exists():
                try:
                    meta = json.loads(meta_json.read_text(encoding="utf-8"))
                except:
                    pass
            
            if not meta.get("always"):
                self.warnings.append({
                    "type": "persistent_behavior",
                    "severity": "low",
                    "message": "Skill installs LaunchAgent/daemon but _meta.json doesn't have 'always': true",
                    "fix": "Add 'always': true to _meta.json if the skill runs persistently"
                })

    def check_file_permissions(self):
        """Check for log files that might need restricted permissions."""
        scripts_dir = self.skill_dir / "scripts"
        if not scripts_dir.exists():
            return
        
        for script_file in scripts_dir.glob("*.py"):
            content = script_file.read_text(encoding="utf-8")
            
            # Look for log file creation
            for match in re.finditer(r"open\(['\"]([^'\"]+\.log)['\"]", content):
                log_file = match.group(1)
                # Check if permissions are set
                context_lines = content[:content.find(match.group(0))].split("\n")
                has_permissions = any("chmod" in line or "os.chmod" in line for line in context_lines[-10:])
                
                if not has_permissions and "secret" in log_file.lower() or "auth" in log_file.lower():
                    self.suggestions.append({
                        "type": "log_file_permissions",
                        "severity": "low",
                        "message": f"{script_file.name} creates log file '{log_file}' - consider restricting permissions",
                        "fix": "Set restrictive permissions on log files containing sensitive data: os.chmod(log_path, 0o600)"
                    })

    def check_metadata_docs_env_consistency(self):
        """Check that _meta.json env/optionalEnv matches SKILL.md and README (required vs optional)."""
        meta_json = self.skill_dir / "_meta.json"
        if not meta_json.exists():
            return
        try:
            meta = json.loads(meta_json.read_text(encoding="utf-8"))
        except Exception:
            return
        requires = meta.get("requires") or {}
        required_env = set(requires.get("env") or [])
        optional_env = set(requires.get("optionalEnv") or [])
        all_declared = required_env | optional_env
        if not all_declared:
            return
        doc_content = ""
        for name in ("SKILL.md", "README.md"):
            p = self.skill_dir / name
            if p.exists():
                doc_content += "\n" + p.read_text(encoding="utf-8")
        for var in all_declared:
            if not re.search(rf"\b{re.escape(var)}\b", doc_content, re.IGNORECASE):
                continue
            in_required = var in required_env
            in_optional = var in optional_env
            # Look at a window around each occurrence of the var to avoid section headers like "Required:" + list with optional items
            says_optional = False
            says_required = False
            for m in re.finditer(rf"\b{re.escape(var)}\b", doc_content, re.IGNORECASE):
                start = max(0, m.start() - 80)
                end = min(len(doc_content), m.end() + 80)
                window = doc_content[start:end]
                if re.search(r"optional|not required|if unset|defaults to", window, re.IGNORECASE):
                    says_optional = True
                # Only treat as "docs say required" if var is explicitly marked required, e.g. "(required)" or "is required"
                if re.search(r"\((required|mandatory)\)|is (required|mandatory)\b|\b(required|mandatory)\s*:", window, re.IGNORECASE):
                    if not re.search(r"optional|not required", window, re.IGNORECASE):
                        says_required = True
            if in_required and says_optional:
                self.issues.append({
                    "type": "metadata_docs_mismatch",
                    "severity": "medium",
                    "message": f"_meta.json lists {var} in required 'env' but docs describe it as optional",
                    "fix": "Either move " + var + " to optionalEnv in _meta.json or remove 'optional'/'not required' from SKILL.md/README"
                })
            if in_optional and says_required:
                self.warnings.append({
                    "type": "metadata_docs_mismatch",
                    "severity": "low",
                    "message": f"_meta.json lists {var} in optionalEnv but docs say it is required/mandatory",
                    "fix": "Either add " + var + " to requires.env in _meta.json or state in SKILL.md/README that it is optional (e.g. defaults to ~/.openclaw)"
                })

    def check_openclaw_json_read_disclosure(self):
        """If scripts read openclaw.json, docs must disclose it and recommend verifying read access."""
        scripts_dir = self.skill_dir / "scripts"
        if not scripts_dir.exists():
            return
        reads_openclaw_json = False
        for script_file in scripts_dir.glob("*.py"):
            content = script_file.read_text(encoding="utf-8")
            if "openclaw.json" in content or ("openclaw" in content and "json" in content and ("open(" in content or "read" in content.lower())):
                reads_openclaw_json = True
                break
        if not reads_openclaw_json:
            return
        doc_content = ""
        for name in ("SKILL.md", "README.md"):
            p = self.skill_dir / name
            if p.exists():
                doc_content += "\n" + p.read_text(encoding="utf-8")
        # Must mention reading openclaw.json (or tools.exec)
        discloses_read = (
            "openclaw.json" in doc_content
            and ("tools.exec" in doc_content or "read" in doc_content.lower() or "access" in doc_content.lower())
        )
        trust_note = bool(
            re.search(r"comfortable granting read access|verify you are comfortable|before installing", doc_content, re.IGNORECASE)
        )
        if not discloses_read:
            self.issues.append({
                "type": "openclaw_json_not_disclosed",
                "severity": "medium",
                "message": "Scripts read openclaw.json but SKILL.md/README do not clearly disclose it and which fields are used",
                "fix": "Document in SKILL.md/README that the skill reads openclaw.json (e.g. tools.exec.host/node only) and only those fields"
            })
        elif not trust_note:
            self.warnings.append({
                "type": "openclaw_json_trust_note_missing",
                "severity": "low",
                "message": "Skill reads openclaw.json but docs don't ask installers to verify they're comfortable granting read access",
                "fix": "Add a 'Before installing' or Requirements note: 'Verify you are comfortable granting read access to that file.'"
            })

    def check_cli_safe_subprocess_warning(self):
        """If docs show CLI examples with user-supplied args, they must warn to use subprocess with list args (no shell)."""
        doc_content = ""
        for name in ("SKILL.md", "README.md"):
            p = self.skill_dir / name
            if p.exists():
                doc_content += "\n" + p.read_text(encoding="utf-8")
        # CLI examples that pass a quoted string (user message / task) as last or only arg
        has_user_arg_cli = bool(
            re.search(r"(python|python3).*\.py\s+[^\n]*--(json|spawn|classify)\s+[\"'].*[\"']", doc_content)
            or re.search(r"spawn\s+--json\s+[\"']", doc_content)
        )
        safe_keywords = (
            "subprocess" in doc_content
            and ("list of arguments" in doc_content or "list arguments" in doc_content or "shell interpolation" in doc_content.lower()
                 or "manual/cli use only" in doc_content.lower() or "manual use only" in doc_content.lower()
                 or "do not build a shell command" in doc_content.lower())
        )
        if has_user_arg_cli and not safe_keywords:
            self.warnings.append({
                "type": "cli_safe_subprocess_warning_missing",
                "severity": "medium",
                "message": "Docs show CLI examples with user-supplied task/message in quotes but no warning to use subprocess with list args for programmatic use",
                "fix": "Add above Commands/CLI: 'Manual/CLI use only. From code with user input, use subprocess.run(..., [..., user_message], ...) with a list of arguments; do not build a shell command string.'"
            })

    def run_all_checks(self):
        """Run all security checks."""
        self.check_requirements_declaration()
        self.check_secret_logging()
        self.check_missing_files()
        self.check_env_vars_declaration()
        self.check_persistent_behavior()
        self.check_file_permissions()
        self.check_metadata_docs_env_consistency()
        self.check_openclaw_json_read_disclosure()
        self.check_cli_safe_subprocess_warning()

    def generate_report(self, json_output: bool = False) -> str:
        """Generate a security review report."""
        self.run_all_checks()
        
        if json_output:
            return json.dumps({
                "issues": self.issues,
                "warnings": self.warnings,
                "suggestions": self.suggestions,
                "summary": {
                    "total_issues": len(self.issues),
                    "total_warnings": len(self.warnings),
                    "total_suggestions": len(self.suggestions),
                    "high_severity": len([i for i in self.issues if i["severity"] == "high"]),
                    "medium_severity": len([i for i in self.issues if i["severity"] == "medium"]),
                }
            }, indent=2)
        
        # Text report
        lines = []
        lines.append("=" * 70)
        lines.append("SECURITY REVIEW REPORT")
        lines.append("=" * 70)
        lines.append("")
        
        if not self.issues and not self.warnings and not self.suggestions:
            lines.append("✓ No security issues found!")
            lines.append("")
            return "\n".join(lines)
        
        if self.issues:
            lines.append(f"ISSUES ({len(self.issues)}):")
            lines.append("-" * 70)
            for issue in self.issues:
                lines.append(f"[{issue['severity'].upper()}] {issue['type']}")
                lines.append(f"  {issue['message']}")
                lines.append(f"  Fix: {issue['fix']}")
                lines.append("")
        
        if self.warnings:
            lines.append(f"WARNINGS ({len(self.warnings)}):")
            lines.append("-" * 70)
            for warning in self.warnings:
                lines.append(f"[{warning['severity'].upper()}] {warning['type']}")
                lines.append(f"  {warning['message']}")
                lines.append(f"  Fix: {warning['fix']}")
                lines.append("")
        
        if self.suggestions:
            lines.append(f"SUGGESTIONS ({len(self.suggestions)}):")
            lines.append("-" * 70)
            for suggestion in self.suggestions:
                lines.append(f"[{suggestion['severity'].upper()}] {suggestion['type']}")
                lines.append(f"  {suggestion['message']}")
                lines.append(f"  Fix: {suggestion['fix']}")
                lines.append("")
        
        lines.append("=" * 70)
        return "\n".join(lines)


def main():
    ap = argparse.ArgumentParser(description="Security review checker for OpenClaw skills")
    ap.add_argument("skill_dir", type=Path, help="Path to skill directory")
    ap.add_argument("--json", action="store_true", help="Output JSON format")
    args = ap.parse_args()
    
    skill_dir = Path(args.skill_dir)
    if not skill_dir.exists():
        print(f"Error: skill directory not found: {skill_dir}", file=sys.stderr)
        sys.exit(1)
    
    reviewer = SecurityReviewer(skill_dir)
    report = reviewer.generate_report(json_output=args.json)
    print(report)
    
    # Exit with error code if there are high-severity issues
    if reviewer.issues:
        high_severity = [i for i in reviewer.issues if i["severity"] == "high"]
        if high_severity:
            sys.exit(1)


if __name__ == "__main__":
    main()