文件预览

file_scanner.py

查看 🩺 memory-health-check 技能包中的文件内容。

文件内容

lib/file_scanner.py

#!/usr/bin/env python3
"""Reusable file system scanning utilities for memory-health-check."""
import logging
import re
import hashlib
from pathlib import Path
from datetime import datetime, timezone, timedelta
from typing import Optional

logger = logging.getLogger("memory-health-check.file_scanner")


class FileScanner:
    """Reusable file system scanner for memory directories."""
    
    TEMP_PATTERNS = {".DS_Store", "Thumbs.db", "desktop.ini", "~"}
    MAX_FILE_SIZE_MB = 500  # Skip files larger than this
    
    def __init__(self, base_dir: Path):
        """Initialize scanner for a base directory.
        
        Args:
            base_dir: Base directory to scan
        """
        self.base_dir = base_dir
    
    def get_dir_size(self, follow_symlinks: bool = False) -> int:
        """Recursively compute directory size in bytes.
        
        Args:
            follow_symlinks: Whether to follow symlinks
            
        Returns:
            Total bytes
        """
        total = 0
        try:
            for f in self.base_dir.rglob("*"):
                if f.is_file() and (follow_symlinks or not f.is_symlink()):
                    try:
                        total += f.stat().st_size
                    except OSError:
                        pass
        except Exception as e:
            logger.warning(f"Error computing dir size: {e}")
        return total
    
    def get_file_counts(self) -> dict[str, int]:
        """Count files by extension/type.
        
        Returns:
            Dict: {"total": int, "md": int, "sqlite": int, "json": int, "other": int}
        """
        counts = {"total": 0, "md": 0, "sqlite": 0, "json": 0, "other": 0}
        
        try:
            for f in self.base_dir.rglob("*"):
                if f.is_file():
                    counts["total"] += 1
                    ext = f.suffix.lower()
                    if ext == ".md":
                        counts["md"] += 1
                    elif ext == ".sqlite":
                        counts["sqlite"] += 1
                    elif ext == ".json":
                        counts["json"] += 1
                    else:
                        counts["other"] += 1
        except Exception as e:
            logger.warning(f"Error counting files: {e}")
        
        return counts
    
    def get_file_ages(self) -> list[tuple[Path, datetime]]:
        """Get modification times for all files.
        
        Returns:
            List of (file_path, mtime_datetime) tuples, sorted newest first
        """
        files_with_mtime = []
        
        try:
            for f in self.base_dir.rglob("*"):
                if f.is_file():
                    try:
                        mtime = datetime.fromtimestamp(f.stat().st_mtime, tz=timezone.utc)
                        files_with_mtime.append((f, mtime))
                    except OSError:
                        pass
        except Exception as e:
            logger.warning(f"Error getting file ages: {e}")
        
        files_with_mtime.sort(key=lambda x: x[1], reverse=True)
        return files_with_mtime
    
    def get_age_distribution(self, now: datetime = None) -> dict[str, int]:
        """Compute age distribution across categories.
        
        Args:
            now: Reference datetime (default: now)
            
        Returns:
            Dict: {"<7d": int, "7-30d": int, "30-90d": int, ">90d": int, "total": int}
        """
        if now is None:
            now = datetime.now(tz=timezone.utc)
        
        week_ago = now - timedelta(days=7)
        month_ago = now - timedelta(days=30)
        quarter_ago = now - timedelta(days=90)
        
        dist = {"<7d": 0, "7-30d": 0, "30-90d": 0, ">90d": 0, "total": 0}
        
        for _, mtime in self.get_file_ages():
            dist["total"] += 1
            if mtime >= week_ago:
                dist["<7d"] += 1
            elif mtime >= month_ago:
                dist["7-30d"] += 1
            elif mtime >= quarter_ago:
                dist["30-90d"] += 1
            else:
                dist[">90d"] += 1
        
        return dist
    
    def find_temp_files(self) -> list[Path]:
        """Find temporary files (.DS_Store, etc.).
        
        Returns:
            List of temp file paths
        """
        temp_files = []
        
        try:
            for f in self.base_dir.rglob("*"):
                if f.is_file() and f.name in self.TEMP_PATTERNS:
                    temp_files.append(f)
        except Exception as e:
            logger.warning(f"Error finding temp files: {e}")
        
        return temp_files
    
    def find_empty_files(self) -> list[Path]:
        """Find empty .md files.
        
        Returns:
            List of empty file paths
        """
        empty_files = []
        
        try:
            for f in self.base_dir.rglob("*.md"):
                if f.is_file():
                    try:
                        if f.stat().st_size == 0:
                            empty_files.append(f)
                    except OSError:
                        pass
        except Exception as e:
            logger.warning(f"Error finding empty files: {e}")
        
        return empty_files
    
    def compute_checksum(self, file_path: Path) -> Optional[str]:
        """Compute SHA256 checksum of a file.
        
        Args:
            file_path: Path to file
            
        Returns:
            Hex digest string, or None on error
        """
        try:
            h = hashlib.sha256()
            with open(file_path, "rb") as f:
                for chunk in iter(lambda: f.read(65536), b""):
                    h.update(chunk)
            return h.hexdigest()
        except Exception:
            return None
    
    def find_wikilinks(self, content: str) -> list[str]:
        """Extract Obsidian-style [[wikilinks]] from content.
        
        Args:
            content: Text content
            
        Returns:
            List of link targets
        """
        return re.findall(r"\[\[([^\]|]+)(?:\|[^\]]+)?\]]", content)
    
    def find_markdown_links(self, content: str) -> list[str]:
        """Extract Markdown [text](url) links from content.
        
        Args:
            content: Text content
            
        Returns:
            List of link targets
        """
        return re.findall(r"\[[^\]]+\]\(([^)]+)\)", content)


if __name__ == "__main__":
    scanner = FileScanner(Path.home() / ".openclaw" / "workspace" / "memory")
    print(f"Dir size: {scanner.get_dir_size()} bytes")
    print(f"File counts: {scanner.get_file_counts()}")
    print(f"Age dist: {scanner.get_age_distribution()}")