文件预览

ggshield_skill.py

查看 ggshield Secret Scanner 技能包中的文件内容。

文件内容

ggshield_skill.py

"""
ggshield Secret Scanner Skill for Moltbot

Detects 500+ types of hardcoded secrets (API keys, credentials, certificates)
before they're committed to git.

Author: GitGuardian Team
Version: 1.0.0
"""

import os
import subprocess


class GGShieldSkill:
    """
    Moltbot skill that wraps ggshield CLI for secret scanning.

    Provides methods to scan repositories, files, staged changes, and Docker images
    for hardcoded secrets using GitGuardian's detection engine.
    """

    def __init__(self):
        """Initialize the ggshield skill."""
        self.name = "ggshield"
        self.version = "1.0.0"
        self.requires_api_key = True
        self.api_key_env = "GITGUARDIAN_API_KEY"

    def _get_api_key(self) -> str:
        """
        Get GitGuardian API key from environment.

        Raises ValueError if not set.
        """
        api_key = os.environ.get(self.api_key_env)
        if not api_key:
            raise ValueError(
                f"{self.api_key_env} not set. "
                "Get one at https://dashboard.gitguardian.com"
            )
        return api_key

    def _run_ggshield(self, *args: str) -> tuple[int, str, str]:
        """
        Run ggshield CLI command with API key.

        Args:
            *args: Arguments to pass to ggshield (e.g., 'secret', 'scan', 'repo', '.')

        Returns:
            Tuple of (exit_code, stdout, stderr)

        Raises:
            FileNotFoundError: If ggshield is not installed
        """
        api_key = self._get_api_key()
        command = ["ggshield", *args]

        env = {**os.environ, self.api_key_env: api_key}

        result = subprocess.run(
            command,
            capture_output=True,
            text=True,
            env=env,
        )

        return result.returncode, result.stdout, result.stderr

    def _format_success(self, message: str) -> str:
        """Format success message with emoji."""
        return f"✅ {message}"

    def _format_error(self, message: str) -> str:
        """Format error message with emoji."""
        return f"❌ {message}"

    def _format_scanning(self, message: str) -> str:
        """Format scanning message with emoji."""
        return f"🔍 {message}"

    def _is_git_repository(self, path: str = ".") -> bool:
        """Check if the given path is inside a git repository."""
        git_dir = os.path.join(path, ".git")
        if os.path.exists(git_dir):
            return True
        # Also check if we're in a subdirectory of a git repo
        try:
            result = subprocess.run(
                ["git", "rev-parse", "--git-dir"],
                capture_output=True,
                text=True,
                cwd=path,
            )
            return result.returncode == 0
        except (FileNotFoundError, OSError):
            return False

    async def scan_repo(self, path: str) -> str:
        """
        Scan entire git repository for secrets.

        Args:
            path: Path to repository root

        Returns:
            User-facing message describing results
        """
        # Validate path exists
        if not os.path.exists(path):
            return self._format_error(f"Path not found: {path}")

        if not os.path.isdir(path):
            return self._format_error(f"Not a directory: {path}")

        try:
            exit_code, stdout, stderr = self._run_ggshield(
                "secret", "scan", "repo", path
            )

            if exit_code == 0:
                return self._format_success(f"Repository clean: {path}")
            else:
                # Secrets were found
                output = stdout if stdout else stderr
                return self._format_error(f"Secrets found in repository:\n{output}")

        except FileNotFoundError:
            return self._format_error(
                "ggshield not installed. Run: pip install ggshield"
            )
        except ValueError as e:
            return self._format_error(str(e))
        except Exception as e:
            return self._format_error(f"Unexpected error: {e}")

    async def scan_file(self, path: str) -> str:
        """
        Scan a single file for secrets.

        Args:
            path: Path to file to scan

        Returns:
            User-facing message describing results
        """
        # Validate file exists
        if not os.path.exists(path):
            return self._format_error(f"File not found: {path}")

        if not os.path.isfile(path):
            return self._format_error(f"Not a file: {path}")

        try:
            exit_code, stdout, stderr = self._run_ggshield("secret", "scan", "path", path)

            if exit_code == 0:
                return self._format_success(f"File clean: {path}")
            else:
                # Secrets were found
                output = stdout if stdout else stderr
                return self._format_error(f"Secrets found in {path}:\n{output}")

        except FileNotFoundError:
            return self._format_error(
                "ggshield not installed. Run: pip install ggshield"
            )
        except ValueError as e:
            return self._format_error(str(e))
        except Exception as e:
            return self._format_error(f"Unexpected error: {e}")

    async def scan_staged(self) -> str:
        """
        Scan only staged git changes (pre-commit mode).

        Fast scanning of what's about to be committed. Requires git repository.

        Returns:
            User-facing message describing results
        """
        # Check we're in a git repository
        if not self._is_git_repository():
            return self._format_error(
                "Not in a git repository. Run from repo root."
            )

        try:
            exit_code, stdout, stderr = self._run_ggshield(
                "secret", "scan", "pre-commit"
            )

            if exit_code == 0:
                return self._format_success("Staged changes are clean")
            else:
                # Secrets were found
                output = stdout if stdout else stderr
                return self._format_error(
                    f"Secrets detected in staged changes:\n{output}"
                )

        except FileNotFoundError:
            return self._format_error(
                "ggshield not installed. Run: pip install ggshield"
            )
        except ValueError as e:
            return self._format_error(str(e))
        except Exception as e:
            return self._format_error(f"Unexpected error: {e}")

    async def install_hooks(self, hook_type: str = "pre-commit") -> str:
        """
        Install ggshield as a git pre-commit hook.

        After installation, every commit will be scanned automatically.

        Args:
            hook_type: Type of hook to install: 'pre-commit' or 'pre-push'

        Returns:
            User-facing message describing installation result
        """
        # Validate hook type
        valid_hooks = ("pre-commit", "pre-push")
        if hook_type not in valid_hooks:
            return self._format_error(
                f"Invalid hook type: {hook_type}. Must be one of: {', '.join(valid_hooks)}"
            )

        # Check we're in a git repository
        if not self._is_git_repository():
            return self._format_error(
                "Not in a git repository. Run from repo root."
            )

        try:
            exit_code, stdout, stderr = self._run_ggshield(
                "install", "--mode", "local", "--hook-type", hook_type
            )

            if exit_code == 0:
                return self._format_success(
                    f"Installed {hook_type} hook.\n"
                    f"From now on, commits with secrets will be blocked."
                )
            else:
                output = stderr if stderr else stdout
                return self._format_error(
                    f"Failed to install hook: {output}"
                )

        except FileNotFoundError:
            return self._format_error(
                "ggshield not installed. Run: pip install ggshield"
            )
        except ValueError as e:
            return self._format_error(str(e))
        except Exception as e:
            return self._format_error(f"Unexpected error: {e}")

    async def scan_docker(self, image: str) -> str:
        """
        Scan Docker image for secrets in its layers.

        Args:
            image: Docker image name/tag (e.g., 'myapp:latest')

        Returns:
            User-facing message describing results
        """
        # Validate image string
        if not image or not image.strip():
            return self._format_error("Docker image name is required")

        image = image.strip()

        try:
            exit_code, stdout, stderr = self._run_ggshield(
                "secret", "scan", "docker", image
            )

            if exit_code == 0:
                return self._format_success(f"Docker image {image} is clean")
            else:
                # Check for Docker not available error
                combined_output = f"{stdout}\n{stderr}".lower()
                if "docker" in combined_output and (
                    "not found" in combined_output
                    or "cannot connect" in combined_output
                    or "is not running" in combined_output
                ):
                    return self._format_error(
                        "Docker is not available. Make sure Docker is installed and running."
                    )

                # Check for image not found
                if "no such image" in combined_output or "not found" in combined_output:
                    return self._format_error(
                        f"Docker image not found: {image}. "
                        "Make sure the image exists locally or pull it first."
                    )

                # Secrets were found
                output = stdout if stdout else stderr
                return self._format_error(
                    f"Secrets found in Docker image {image}:\n{output}"
                )

        except FileNotFoundError:
            return self._format_error(
                "ggshield not installed. Run: pip install ggshield"
            )
        except ValueError as e:
            return self._format_error(str(e))
        except Exception as e:
            return self._format_error(f"Unexpected error: {e}")