文件内容
scripts/integrity.py
#!/usr/bin/env python3
"""
OpenClaw Warden Pro — Workspace Integrity Verification
Detects unauthorized modifications and prompt injection patterns
in agent workspace files. Includes countermeasures: snapshot restore,
skill quarantine, and git rollback.
Usage:
integrity.py baseline [--workspace PATH]
integrity.py verify [--workspace PATH]
integrity.py scan [--workspace PATH]
integrity.py full [--workspace PATH]
integrity.py status [--workspace PATH]
integrity.py accept FILE [--workspace PATH]
integrity.py restore FILE [--workspace PATH]
integrity.py quarantine SKILL [--workspace PATH]
integrity.py unquarantine SKILL [--workspace PATH]
integrity.py rollback FILE [--workspace PATH]
integrity.py protect [--workspace PATH]
"""
import hashlib
import io
import json
import os
import re
import sys
import difflib
from datetime import datetime, timezone
from pathlib import Path
# Ensure stdout can handle Unicode on Windows (cp1252 etc.)
if sys.stdout.encoding and sys.stdout.encoding.lower() not in ("utf-8", "utf8"):
sys.stdout = io.TextIOWrapper(
sys.stdout.buffer, encoding="utf-8", errors="replace"
)
sys.stderr = io.TextIOWrapper(
sys.stderr.buffer, encoding="utf-8", errors="replace"
)
MANIFEST_VERSION = 1
INTEGRITY_DIR = ".integrity"
MANIFEST_FILE = "manifest.json"
SNAPSHOTS_DIR = "snapshots"
QUARANTINE_PREFIX = ".quarantined-"
# ---------------------------------------------------------------------------
# File categories and their default severity on unexpected change
# ---------------------------------------------------------------------------
CRITICAL_FILES = {
"SOUL.md", "AGENTS.md", "IDENTITY.md", "USER.md",
"TOOLS.md", "HEARTBEAT.md",
}
MEMORY_PATTERNS = ["memory/*.md", "MEMORY.md"]
CONFIG_PATTERNS = ["*.json"]
SKILL_PATTERNS = ["skills/*/SKILL.md"]
SEVERITY_WARNING = "WARNING"
SEVERITY_INFO = "INFO"
SEVERITY_CRITICAL = "CRITICAL"
# ---------------------------------------------------------------------------
# Injection detection patterns
# ---------------------------------------------------------------------------
INSTRUCTION_OVERRIDE_PATTERNS = [
r"(?i)ignore\s+(all\s+)?previous\s+instructions",
r"(?i)disregard\s+(all\s+)?(above|previous|prior)",
r"(?i)forget\s+(all\s+)?your\s+instructions",
r"(?i)you\s+are\s+now\s+(?!(?:ready|going|able))",
r"(?i)new\s+system\s+prompt",
r"(?i)override\s+(all\s+)?(previous|safety|existing)\s+(instructions|rules|guidelines)",
r"(?i)act\s+as\s+if\s+you\s+(have\s+)?no\s+(restrictions|rules|guidelines|limits)",
r"(?i)from\s+now\s+on[,\s]+you\s+(will|must|should)",
r"(?i)entering\s+(a\s+)?(new|special|developer|admin)\s+mode",
r"(?i)execute\s+the\s+following\s+(commands?|instructions?|code)\s*(:|without)",
]
SYSTEM_PROMPT_MARKERS = [
r"<\s*system\s*>",
r"\[\s*SYSTEM\s*\]",
r"<<\s*SYS\s*>>",
r"<\s*\|im_start\|>\s*system",
r"\[INST\]",
r"###\s*(?:System|Assistant|Human)\s*:",
]
HTML_INJECTION_PATTERNS = [
r"<\s*script[\s>]",
r"<\s*iframe[\s>]",
r"<\s*object[\s>]",
r"<\s*embed[\s>]",
r"<\s*link\s[^>]*rel\s*=\s*[\"']?import",
r"style\s*=\s*[\"'][^\"']*display\s*:\s*none",
r"<\s*div[^>]*hidden",
r"<\s*img\s[^>]*onerror\s*=",
]
EXFIL_IMAGE_PATTERN = (
r"!\[[^\]]*\]\(\s*https?://[^)]*"
r"(?:[?&][a-zA-Z0-9_]+=(?:[A-Za-z0-9+/]{20,}={0,2}|[0-9a-fA-F]{20,}))"
)
BASE64_BLOB_PATTERN = r"(?<![A-Za-z0-9+/])[A-Za-z0-9+/]{60,}={0,2}(?![A-Za-z0-9+/])"
UNICODE_TRICKS = [
"\u200b", # zero-width space
"\u200c", # zero-width non-joiner
"\u200d", # zero-width joiner
"\u2060", # word joiner
"\u2062", # invisible times
"\u2063", # invisible separator
"\ufeff", # zero-width no-break space / BOM
"\u202a", # LTR embedding
"\u202b", # RTL embedding
"\u202c", # pop directional formatting
"\u202d", # LTR override
"\u202e", # RTL override
"\u2066", # LTR isolate
"\u2067", # RTL isolate
"\u2068", # first strong isolate
"\u2069", # pop directional isolate
"\u00ad", # soft hyphen (invisible in many renders)
]
SHELL_INJECTION_PATTERNS = [
r"\$\([^)]+\)", # $(command) subshell execution
]
# Inline backtick references (like `SOUL.md` or `python3 cmd`) are normal
# markdown formatting, not shell injection. Only flag multi-line backtick
# blocks that aren't fenced code blocks — but those are already handled
# by the code-block check. So we skip backtick patterns entirely to avoid
# false positives on standard markdown inline code.
# ---------------------------------------------------------------------------
# Utility functions
# ---------------------------------------------------------------------------
def sha256_file(path: Path) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
def now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def resolve_workspace(args: list[str]) -> Path:
"""Determine workspace path from args, env, or defaults."""
ws = None
for i, arg in enumerate(args):
if arg == "--workspace" and i + 1 < len(args):
ws = args[i + 1]
break
if ws is None:
ws = os.environ.get("OPENCLAW_WORKSPACE")
if ws is None:
cwd = Path.cwd()
if (cwd / "AGENTS.md").exists():
ws = str(cwd)
if ws is None:
ws = str(Path.home() / ".openclaw" / "workspace")
p = Path(ws)
if not p.is_dir():
print(f"ERROR: Workspace not found: {p}", file=sys.stderr)
sys.exit(1)
return p
def manifest_path(workspace: Path) -> Path:
return workspace / INTEGRITY_DIR / MANIFEST_FILE
def load_manifest(workspace: Path) -> dict | None:
mp = manifest_path(workspace)
if not mp.exists():
return None
with open(mp, "r", encoding="utf-8") as f:
return json.load(f)
def save_manifest(workspace: Path, manifest: dict):
mp = manifest_path(workspace)
mp.parent.mkdir(parents=True, exist_ok=True)
with open(mp, "w", encoding="utf-8") as f:
json.dump(manifest, f, indent=2)
def classify_file(rel_path: str) -> tuple[str, str]:
"""Return (category, severity_on_change) for a relative path."""
name = Path(rel_path).name
if name in CRITICAL_FILES:
return "critical", SEVERITY_WARNING
if rel_path == "MEMORY.md" or rel_path.startswith("memory/"):
if rel_path.endswith(".md"):
return "memory", SEVERITY_INFO
if rel_path.startswith("skills/") and name == "SKILL.md":
return "skills", SEVERITY_WARNING
if not "/" in rel_path and rel_path.endswith(".json"):
return "config", SEVERITY_WARNING
return "other", SEVERITY_INFO
def collect_monitored_files(workspace: Path) -> dict[str, Path]:
"""Collect all files that should be monitored, returning {rel_path: abs_path}."""
files = {}
# Critical files in workspace root
for name in CRITICAL_FILES:
p = workspace / name
if p.is_file():
files[name] = p
# MEMORY.md
p = workspace / "MEMORY.md"
if p.is_file():
files["MEMORY.md"] = p
# memory/*.md
mem_dir = workspace / "memory"
if mem_dir.is_dir():
for f in mem_dir.iterdir():
if f.is_file() and f.suffix == ".md":
rel = f.relative_to(workspace).as_posix()
files[rel] = f
# *.json in root (but not in .integrity/)
for f in workspace.iterdir():
if f.is_file() and f.suffix == ".json":
files[f.name] = f
# skills/*/SKILL.md
skills_dir = workspace / "skills"
if skills_dir.is_dir():
for skill in skills_dir.iterdir():
if skill.is_dir():
sm = skill / "SKILL.md"
if sm.is_file():
rel = sm.relative_to(workspace).as_posix()
files[rel] = sm
return files
def read_file_text(path: Path) -> str | None:
"""Read file as text, returning None if binary."""
try:
with open(path, "r", encoding="utf-8") as f:
return f.read()
except (UnicodeDecodeError, ValueError):
return None
# ---------------------------------------------------------------------------
# Commands
# ---------------------------------------------------------------------------
def snapshot_dir(workspace: Path) -> Path:
return workspace / INTEGRITY_DIR / SNAPSHOTS_DIR
def save_snapshot(workspace: Path, rel: str, abspath: Path):
"""Store a copy of a file for later restoration."""
dest = snapshot_dir(workspace) / rel
dest.parent.mkdir(parents=True, exist_ok=True)
import shutil
shutil.copy2(abspath, dest)
def get_snapshot_path(workspace: Path, rel: str) -> Path | None:
"""Get the snapshot path for a file, or None if no snapshot exists."""
p = snapshot_dir(workspace) / rel
return p if p.is_file() else None
def cmd_baseline(workspace: Path):
"""Establish or reset the integrity baseline."""
files = collect_monitored_files(workspace)
if not files:
print("No monitored files found in workspace.")
return
manifest = {
"version": MANIFEST_VERSION,
"created": now_iso(),
"updated": now_iso(),
"workspace": str(workspace),
"files": {},
}
snapshotted = 0
for rel, abspath in sorted(files.items()):
cat, _ = classify_file(rel)
stat = abspath.stat()
manifest["files"][rel] = {
"sha256": sha256_file(abspath),
"size": stat.st_size,
"modified": datetime.fromtimestamp(
stat.st_mtime, tz=timezone.utc
).isoformat(),
"category": cat,
}
# Snapshot critical and config files for restoration
if cat in ("critical", "config", "skills"):
save_snapshot(workspace, rel, abspath)
snapshotted += 1
save_manifest(workspace, manifest)
print(f"Baseline established: {len(manifest['files'])} files tracked, {snapshotted} snapshotted")
for rel in sorted(manifest["files"]):
cat = manifest["files"][rel]["category"]
has_snap = "S" if get_snapshot_path(workspace, rel) else " "
print(f" [{cat:8s}] [{has_snap}] {rel}")
def cmd_verify(workspace: Path) -> list[dict]:
"""Verify workspace files against baseline. Returns list of findings."""
manifest = load_manifest(workspace)
if manifest is None:
print("No baseline found. Run 'baseline' first.")
sys.exit(1)
findings = []
current_files = collect_monitored_files(workspace)
baseline_files = manifest.get("files", {})
# Check for modified and deleted files
for rel, info in sorted(baseline_files.items()):
if rel in current_files:
abspath = current_files[rel]
current_hash = sha256_file(abspath)
if current_hash != info["sha256"]:
cat, severity = classify_file(rel)
# Generate diff
diff_lines = []
old_text = None
new_text = read_file_text(abspath)
# We don't have the old content, but we can show the hash mismatch
finding = {
"type": "modified",
"file": rel,
"category": cat,
"severity": severity,
"old_hash": info["sha256"][:16] + "...",
"new_hash": current_hash[:16] + "...",
"old_size": info["size"],
"new_size": abspath.stat().st_size,
}
findings.append(finding)
else:
cat, severity = classify_file(rel)
findings.append({
"type": "deleted",
"file": rel,
"category": cat,
"severity": SEVERITY_WARNING if cat == "critical" else severity,
})
# Check for new untracked files
for rel in sorted(current_files):
if rel not in baseline_files:
cat, _ = classify_file(rel)
findings.append({
"type": "new",
"file": rel,
"category": cat,
"severity": SEVERITY_INFO,
})
return findings
def _is_inside_code_block(text: str, match_start: int) -> bool:
"""Check if a match position is inside a fenced code block."""
lines = text[:match_start].split("\n")
fence_count = 0
for line in lines:
stripped = line.strip()
if stripped.startswith("```") or stripped.startswith("~~~"):
fence_count += 1
return fence_count % 2 == 1
def scan_file_for_injections(path: Path, rel_path: str) -> list[dict]:
"""Scan a single file for prompt injection patterns."""
text = read_file_text(path)
if text is None:
return []
findings = []
def add_finding(pattern_type: str, detail: str, line_num: int, severity: str = SEVERITY_CRITICAL):
findings.append({
"type": "injection",
"file": rel_path,
"pattern_type": pattern_type,
"detail": detail,
"line": line_num,
"severity": severity,
})
def line_number_at(pos: int) -> int:
return text[:pos].count("\n") + 1
# Instruction override patterns
for pattern in INSTRUCTION_OVERRIDE_PATTERNS:
for m in re.finditer(pattern, text):
if not _is_inside_code_block(text, m.start()):
ln = line_number_at(m.start())
add_finding(
"instruction_override",
f"Instruction override pattern: '{m.group()[:80]}'",
ln,
)
# System prompt markers
for pattern in SYSTEM_PROMPT_MARKERS:
for m in re.finditer(pattern, text):
if not _is_inside_code_block(text, m.start()):
ln = line_number_at(m.start())
add_finding(
"system_prompt_marker",
f"System prompt marker: '{m.group()[:60]}'",
ln,
)
# HTML injection
for pattern in HTML_INJECTION_PATTERNS:
for m in re.finditer(pattern, text, re.IGNORECASE):
if not _is_inside_code_block(text, m.start()):
ln = line_number_at(m.start())
add_finding(
"html_injection",
f"HTML injection: '{m.group()[:60]}'",
ln,
)
# Markdown image exfiltration
for m in re.finditer(EXFIL_IMAGE_PATTERN, text):
if not _is_inside_code_block(text, m.start()):
ln = line_number_at(m.start())
add_finding(
"exfil_image",
f"Suspicious image URL with encoded data: '{m.group()[:80]}'",
ln,
)
# Base64 blobs outside code blocks
for m in re.finditer(BASE64_BLOB_PATTERN, text):
if not _is_inside_code_block(text, m.start()):
blob = m.group()
# Skip if it looks like a normal hash (exactly 64 hex chars = sha256)
if len(blob) <= 64 and re.fullmatch(r"[0-9a-fA-F]+", blob):
continue
# Skip short blobs that might be normal encoded values
if len(blob) < 80:
continue
ln = line_number_at(m.start())
add_finding(
"base64_payload",
f"Large base64 blob ({len(blob)} chars): '{blob[:40]}...'",
ln,
SEVERITY_WARNING,
)
# Unicode tricks
for char in UNICODE_TRICKS:
idx = text.find(char)
while idx != -1:
ln = line_number_at(idx)
add_finding(
"unicode_trick",
f"Hidden Unicode character U+{ord(char):04X} ({repr(char)})",
ln,
SEVERITY_WARNING,
)
idx = text.find(char, idx + 1)
# Shell injection outside code blocks
for pattern in SHELL_INJECTION_PATTERNS:
for m in re.finditer(pattern, text):
if not _is_inside_code_block(text, m.start()):
ln = line_number_at(m.start())
add_finding(
"shell_injection",
f"Possible shell injection: '{m.group()[:60]}'",
ln,
SEVERITY_WARNING,
)
return findings
def cmd_scan(workspace: Path) -> list[dict]:
"""Scan all workspace files for injection patterns."""
files = collect_monitored_files(workspace)
all_findings = []
for rel, abspath in sorted(files.items()):
# Skip our own skill files — they document injection patterns
# and will always trigger false positives
if rel.startswith("skills/openclaw-warden/"):
continue
# Skip quarantined skills — already neutralized
if QUARANTINE_PREFIX in rel:
continue
findings = scan_file_for_injections(abspath, rel)
all_findings.extend(findings)
return all_findings
def cmd_full(workspace: Path) -> tuple[list[dict], list[dict]]:
"""Run both verify and scan."""
verify_findings = cmd_verify(workspace)
scan_findings = cmd_scan(workspace)
return verify_findings, scan_findings
def cmd_status(workspace: Path):
"""Quick one-line workspace health summary."""
manifest = load_manifest(workspace)
if manifest is None:
print("STATUS: NO BASELINE — Run 'baseline' to initialize")
return
files = collect_monitored_files(workspace)
baseline_files = manifest.get("files", {})
modified = 0
deleted = 0
new = 0
for rel, info in baseline_files.items():
if rel in files:
if sha256_file(files[rel]) != info["sha256"]:
modified += 1
else:
deleted += 1
for rel in files:
if rel not in baseline_files:
new += 1
total = len(baseline_files)
updated = manifest.get("updated", "unknown")
if modified == 0 and deleted == 0 and new == 0:
print(f"STATUS: CLEAN — {total} files tracked, baseline from {updated}")
else:
parts = []
if modified:
parts.append(f"{modified} modified")
if deleted:
parts.append(f"{deleted} deleted")
if new:
parts.append(f"{new} new")
print(f"STATUS: CHANGED — {', '.join(parts)} ({total} tracked, baseline from {updated})")
def cmd_accept(workspace: Path, filepath: str):
"""Accept a changed file into the baseline."""
manifest = load_manifest(workspace)
if manifest is None:
print("No baseline found. Run 'baseline' first.")
sys.exit(1)
# Normalize the path
rel = filepath.replace("\\", "/")
abspath = workspace / rel
if not abspath.is_file():
print(f"File not found: {rel}")
sys.exit(1)
cat, _ = classify_file(rel)
stat = abspath.stat()
manifest["files"][rel] = {
"sha256": sha256_file(abspath),
"size": stat.st_size,
"modified": datetime.fromtimestamp(
stat.st_mtime, tz=timezone.utc
).isoformat(),
"category": cat,
}
manifest["updated"] = now_iso()
save_manifest(workspace, manifest)
print(f"Accepted: {rel} (category: {cat})")
# ---------------------------------------------------------------------------
# Countermeasures
# ---------------------------------------------------------------------------
def cmd_restore(workspace: Path, filepath: str):
"""Restore a file from its baseline snapshot."""
rel = filepath.replace("\\", "/")
snap = get_snapshot_path(workspace, rel)
if snap is None:
print(f"No snapshot found for: {rel}")
print("Only critical, config, and skill files are snapshotted.")
sys.exit(1)
dest = workspace / rel
import shutil
shutil.copy2(snap, dest)
print(f"Restored: {rel} (from baseline snapshot)")
# Verify the restore matched the baseline hash
manifest = load_manifest(workspace)
if manifest and rel in manifest.get("files", {}):
expected = manifest["files"][rel]["sha256"]
actual = sha256_file(dest)
if expected == actual:
print(f" Verified: hash matches baseline")
else:
print(f" WARNING: restored file hash does not match baseline")
print(f" Expected: {expected[:16]}...")
print(f" Got: {actual[:16]}...")
def cmd_rollback(workspace: Path, filepath: str):
"""Rollback a file to its last git-committed state."""
import subprocess
rel = filepath.replace("\\", "/")
abspath = workspace / rel
# Check if workspace is a git repo
git_dir = workspace / ".git"
if not git_dir.exists():
print(f"Workspace is not a git repository. Cannot rollback.")
print(f"Use 'restore' to restore from snapshot instead.")
sys.exit(1)
# Check if file is tracked by git
result = subprocess.run(
["git", "ls-files", rel],
cwd=str(workspace),
capture_output=True, text=True,
)
if not result.stdout.strip():
print(f"File is not tracked by git: {rel}")
sys.exit(1)
# Checkout from HEAD
result = subprocess.run(
["git", "checkout", "HEAD", "--", rel],
cwd=str(workspace),
capture_output=True, text=True,
)
if result.returncode != 0:
print(f"Git rollback failed: {result.stderr.strip()}")
sys.exit(1)
print(f"Rolled back: {rel} (to last git commit)")
print(f" Hash: {sha256_file(abspath)[:16]}...")
def cmd_quarantine(workspace: Path, skill_name: str):
"""Quarantine a skill by renaming its directory so OpenClaw won't load it."""
skills_dir = workspace / "skills"
skill_dir = skills_dir / skill_name
if not skill_dir.is_dir():
# Check if already quarantined
quarantined = skills_dir / (QUARANTINE_PREFIX + skill_name)
if quarantined.is_dir():
print(f"Skill '{skill_name}' is already quarantined.")
return
print(f"Skill not found: {skill_name}")
print(f"Available skills:")
if skills_dir.is_dir():
for d in sorted(skills_dir.iterdir()):
if d.is_dir():
prefix = "[Q] " if d.name.startswith(QUARANTINE_PREFIX) else " "
name = d.name.removeprefix(QUARANTINE_PREFIX) if d.name.startswith(QUARANTINE_PREFIX) else d.name
print(f" {prefix}{name}")
sys.exit(1)
quarantined = skills_dir / (QUARANTINE_PREFIX + skill_name)
skill_dir.rename(quarantined)
print(f"Quarantined: {skill_name}")
print(f" Moved: skills/{skill_name}/ -> skills/{QUARANTINE_PREFIX}{skill_name}/")
print(f" OpenClaw will not load this skill on next session.")
print(f" To restore: run 'unquarantine {skill_name}'")
def cmd_unquarantine(workspace: Path, skill_name: str):
"""Restore a quarantined skill."""
skills_dir = workspace / "skills"
quarantined = skills_dir / (QUARANTINE_PREFIX + skill_name)
if not quarantined.is_dir():
print(f"No quarantined skill found: {skill_name}")
sys.exit(1)
restored = skills_dir / skill_name
if restored.is_dir():
print(f"Cannot unquarantine: skills/{skill_name}/ already exists")
sys.exit(1)
quarantined.rename(restored)
print(f"Unquarantined: {skill_name}")
print(f" Moved: skills/{QUARANTINE_PREFIX}{skill_name}/ -> skills/{skill_name}/")
print(f" WARNING: Re-scan this skill before use.")
def cmd_protect(workspace: Path):
"""Full scan + automatic countermeasures for critical threats.
Actions taken:
1. Run full integrity verification + injection scan
2. For critical injection findings: restore from snapshot if available
3. For modified critical files with injections: restore from snapshot
4. For skills containing injections: quarantine the skill
5. Report all actions taken
"""
print("=" * 60)
print("WORKSPACE PROTECTION SWEEP")
print("=" * 60)
print(f"Timestamp: {now_iso()}")
print()
verify_findings, scan_findings = cmd_full(workspace)
actions_taken = []
if not verify_findings and not scan_findings:
print("No threats detected. Workspace is clean.")
print("=" * 60)
return
# Identify files with injection findings
injected_files = set()
for f in scan_findings:
if f.get("severity") == SEVERITY_CRITICAL:
injected_files.add(f["file"])
# Identify modified critical files
modified_critical = set()
for f in verify_findings:
if f["type"] == "modified" and f["category"] == "critical":
modified_critical.add(f["file"])
print()
print("-" * 40)
print("COUNTERMEASURES")
print("-" * 40)
# Restore critical files that were modified and have injections
files_to_restore = (modified_critical & injected_files) | modified_critical
for rel in sorted(files_to_restore):
snap = get_snapshot_path(workspace, rel)
if snap:
import shutil
dest = workspace / rel
shutil.copy2(snap, dest)
actions_taken.append(f"RESTORED: {rel} (from baseline snapshot)")
print(f" [RESTORE] {rel} <- baseline snapshot")
else:
# Try git rollback
git_dir = workspace / ".git"
if git_dir.exists():
import subprocess
result = subprocess.run(
["git", "checkout", "HEAD", "--", rel],
cwd=str(workspace),
capture_output=True, text=True,
)
if result.returncode == 0:
actions_taken.append(f"ROLLED BACK: {rel} (from git)")
print(f" [ROLLBACK] {rel} <- git HEAD")
else:
actions_taken.append(f"FAILED TO RESTORE: {rel}")
print(f" [FAILED] {rel} — no snapshot or git history")
else:
actions_taken.append(f"UNRESOLVED: {rel} — no snapshot available")
print(f" [MANUAL] {rel} — no snapshot, review manually")
# Quarantine skills with critical injection findings
quarantined_skills = set()
for rel in injected_files:
if rel.startswith("skills/") and "/SKILL.md" in rel:
# Extract skill name: skills/<name>/SKILL.md
parts = rel.split("/")
if len(parts) >= 2:
skill_name = parts[1]
if skill_name.startswith(QUARANTINE_PREFIX):
continue
if skill_name == "openclaw-warden":
continue
if skill_name not in quarantined_skills:
skill_dir = workspace / "skills" / skill_name
quarantined_dir = workspace / "skills" / (QUARANTINE_PREFIX + skill_name)
if skill_dir.is_dir():
skill_dir.rename(quarantined_dir)
quarantined_skills.add(skill_name)
actions_taken.append(f"QUARANTINED: skill '{skill_name}'")
print(f" [QUARANTINE] skills/{skill_name}/")
# Handle injected memory/other files (can't auto-restore, flag for review)
for rel in sorted(injected_files):
if rel.startswith("skills/"):
continue # Handled above
if rel in files_to_restore:
continue # Already restored
actions_taken.append(f"FLAGGED: {rel} — contains injections, manual review needed")
print(f" [FLAG] {rel} — injection detected, review manually")
print()
if actions_taken:
print(f"ACTIONS TAKEN: {len(actions_taken)}")
for a in actions_taken:
print(f" - {a}")
else:
print("No automatic actions taken. Review findings above.")
print()
print("NEXT STEPS:")
if files_to_restore:
print(" - Restored files should be re-verified: run 'verify'")
if quarantined_skills:
print(" - Quarantined skills will not load on next session")
print(" - Investigate quarantined skills before unquarantining")
print(" - Run 'baseline' to update baseline after review")
print("=" * 60)
# ---------------------------------------------------------------------------
# Report formatting
# ---------------------------------------------------------------------------
def format_findings(verify_findings: list[dict], scan_findings: list[dict]) -> str:
"""Format findings into a structured text report."""
lines = []
lines.append("=" * 60)
lines.append("WORKSPACE INTEGRITY REPORT")
lines.append("=" * 60)
lines.append(f"Timestamp: {now_iso()}")
lines.append("")
total_issues = len(verify_findings) + len(scan_findings)
criticals = sum(1 for f in scan_findings if f.get("severity") == SEVERITY_CRITICAL)
warnings = sum(
1 for f in verify_findings + scan_findings
if f.get("severity") == SEVERITY_WARNING
)
infos = sum(
1 for f in verify_findings + scan_findings
if f.get("severity") == SEVERITY_INFO
)
if total_issues == 0:
lines.append("RESULT: CLEAN")
lines.append("No integrity violations or injection patterns detected.")
lines.append("=" * 60)
return "\n".join(lines)
lines.append(f"RESULT: {total_issues} ISSUE(S) FOUND")
if criticals:
lines.append(f" CRITICAL: {criticals}")
if warnings:
lines.append(f" WARNING: {warnings}")
if infos:
lines.append(f" INFO: {infos}")
lines.append("")
# Integrity findings
modified = [f for f in verify_findings if f["type"] == "modified"]
deleted = [f for f in verify_findings if f["type"] == "deleted"]
new = [f for f in verify_findings if f["type"] == "new"]
if modified:
lines.append("-" * 40)
lines.append("MODIFIED FILES")
lines.append("-" * 40)
for f in modified:
lines.append(f" [{f['severity']:8s}] {f['file']} ({f['category']})")
lines.append(f" Hash: {f['old_hash']} -> {f['new_hash']}")
lines.append(f" Size: {f['old_size']} -> {f['new_size']} bytes")
lines.append("")
if deleted:
lines.append("-" * 40)
lines.append("DELETED FILES")
lines.append("-" * 40)
for f in deleted:
lines.append(f" [{f['severity']:8s}] {f['file']} ({f['category']})")
lines.append("")
if new:
lines.append("-" * 40)
lines.append("NEW UNTRACKED FILES")
lines.append("-" * 40)
for f in new:
lines.append(f" [{f['severity']:8s}] {f['file']} ({f['category']})")
lines.append("")
# Injection findings
if scan_findings:
lines.append("-" * 40)
lines.append("INJECTION SCAN RESULTS")
lines.append("-" * 40)
by_file: dict[str, list[dict]] = {}
for f in scan_findings:
by_file.setdefault(f["file"], []).append(f)
for fname, findings in sorted(by_file.items()):
lines.append(f" {fname}:")
for f in findings:
lines.append(
f" [{f['severity']:8s}] Line {f['line']}: "
f"{f['pattern_type']} — {f['detail']}"
)
lines.append("")
lines.append("=" * 60)
lines.append("RECOMMENDED ACTIONS:")
if criticals:
lines.append(" 1. CRITICAL issues require immediate investigation.")
lines.append(" Review flagged files for unauthorized modifications.")
lines.append(" Do NOT load these files into the agent until verified.")
if modified:
lines.append(" - Review modified files. If changes are legitimate,")
lines.append(" run 'accept <file>' to update the baseline.")
if new:
lines.append(" - Review new files. Run 'baseline' to include them,")
lines.append(" or investigate why they appeared.")
lines.append("=" * 60)
return "\n".join(lines)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main():
args = sys.argv[1:]
if not args or args[0] in ("-h", "--help", "help"):
print(__doc__.strip())
sys.exit(0)
command = args[0]
rest = args[1:]
if command == "baseline":
workspace = resolve_workspace(rest)
cmd_baseline(workspace)
elif command == "verify":
workspace = resolve_workspace(rest)
findings = cmd_verify(workspace)
report = format_findings(findings, [])
print(report)
if any(f["severity"] == SEVERITY_CRITICAL for f in findings):
sys.exit(2)
elif findings:
sys.exit(1)
elif command == "scan":
workspace = resolve_workspace(rest)
findings = cmd_scan(workspace)
report = format_findings([], findings)
print(report)
if any(f["severity"] == SEVERITY_CRITICAL for f in findings):
sys.exit(2)
elif findings:
sys.exit(1)
elif command == "full":
workspace = resolve_workspace(rest)
verify_findings, scan_findings = cmd_full(workspace)
report = format_findings(verify_findings, scan_findings)
print(report)
all_findings = verify_findings + scan_findings
if any(f["severity"] == SEVERITY_CRITICAL for f in all_findings):
sys.exit(2)
elif all_findings:
sys.exit(1)
elif command == "status":
workspace = resolve_workspace(rest)
cmd_status(workspace)
elif command == "accept":
if not rest or rest[0].startswith("-"):
print("Usage: integrity.py accept <file> [--workspace PATH]")
sys.exit(1)
filepath = rest[0]
workspace = resolve_workspace(rest[1:])
cmd_accept(workspace, filepath)
elif command == "restore":
if not rest or rest[0].startswith("-"):
print("Usage: integrity.py restore <file> [--workspace PATH]")
sys.exit(1)
filepath = rest[0]
workspace = resolve_workspace(rest[1:])
cmd_restore(workspace, filepath)
elif command == "rollback":
if not rest or rest[0].startswith("-"):
print("Usage: integrity.py rollback <file> [--workspace PATH]")
sys.exit(1)
filepath = rest[0]
workspace = resolve_workspace(rest[1:])
cmd_rollback(workspace, filepath)
elif command == "quarantine":
if not rest or rest[0].startswith("-"):
print("Usage: integrity.py quarantine <skill-name> [--workspace PATH]")
sys.exit(1)
skill_name = rest[0]
workspace = resolve_workspace(rest[1:])
cmd_quarantine(workspace, skill_name)
elif command == "unquarantine":
if not rest or rest[0].startswith("-"):
print("Usage: integrity.py unquarantine <skill-name> [--workspace PATH]")
sys.exit(1)
skill_name = rest[0]
workspace = resolve_workspace(rest[1:])
cmd_unquarantine(workspace, skill_name)
elif command == "protect":
workspace = resolve_workspace(rest)
cmd_protect(workspace)
else:
print(f"Unknown command: {command}")
print("Commands: baseline, verify, scan, full, status, accept,")
print(" restore, rollback, quarantine, unquarantine, protect")
sys.exit(1)
if __name__ == "__main__":
main()