文件预览

bloat_detector.py

查看 🩺 memory-health-check 技能包中的文件内容。

文件内容

bin/bloat_detector.py

#!/usr/bin/env python3
"""Bloat detection — DB size, file count, growth rate."""
import argparse
import json
import logging
from pathlib import Path
from datetime import datetime, timezone, timedelta
from typing import Optional

import sys
sys.path.insert(0, str(Path(__file__).parent.parent / "lib"))

from config_loader import load_config
from file_scanner import FileScanner
from health_models import DimResult, GrowthRate
from log_utils import get_logger

logger = get_logger("memory-health-check.bloat_detector")


def get_dir_size(path: Path, follow_symlinks: bool = False) -> int:
    """Recursively compute directory size in bytes.
    
    Args:
        path: Directory path
        follow_symlinks: Whether to follow symlinks
        
    Returns:
        Total bytes
    """
    total = 0
    try:
        for f in path.rglob("*"):
            if f.is_file() and (follow_symlinks or not f.is_symlink()):
                try:
                    total += f.stat().st_size
                except OSError:
                    pass
    except Exception:
        pass
    return total


def get_file_counts(base_dir: Path) -> dict:
    """Count files by extension/type.
    
    Returns:
        dict: {"total": int, "md": int, "sqlite": int, "json": int, "other": int}
    """
    scanner = FileScanner(base_dir)
    return scanner.get_file_counts()


def get_growth_rate(
    base_dir: Path,
    report_dir: Path = None,
    snapshots: int = 4,
    interval_days: int = 7,
) -> Optional[GrowthRate]:
    """Estimate growth rate by comparing recent health reports.
    
    Reads previous health reports to get historical sizes.
    
    Args:
        base_dir: Base directory for health reports
        report_dir: Where health reports are stored
        snapshots: Number of historical points to compare
        interval_days: Expected interval between reports
        
    Returns:
        GrowthRate dict or None if insufficient data
    """
    if report_dir is None:
        report_dir = base_dir / "memory" / "health-reports"
    
    if not report_dir.exists():
        return None
    
    # Find recent report files
    reports = sorted(report_dir.glob("health-report-*.json"), reverse=True)
    
    if len(reports) < 2:
        return None
    
    # Read size from reports
    sizes_mb = []
    dates = []
    
    for report_file in reports[:snapshots]:
        try:
            data = json.loads(report_file.read_text(encoding="utf-8"))
            # Try to find bloat info in dimensions
            if "dimensions" in data and "bloat" in data["dimensions"]:
                mb = data["dimensions"]["bloat"].get("value", {}).get("total_mb", 0)
                sizes_mb.append(mb)
                dates.append(report_file.stat().st_mtime)
        except Exception:
            pass
    
    if len(sizes_mb) < 2:
        return None
    
    # Simple linear growth estimation
    n = len(sizes_mb)
    if n < 2:
        return None
    
    # Simple linear regression
    x = list(range(n))  # [0, 1, 2, ...]
    x_mean = sum(x) / n
    y_mean = sum(sizes_mb) / n
    
    numerator = sum((x[i] - x_mean) * (sizes_mb[i] - y_mean) for i in range(n))
    denominator = sum((x[i] - x_mean) ** 2 for i in range(n))
    
    if denominator == 0:
        slope = 0.0
    else:
        slope = numerator / denominator
    
    growth_per_week = slope * (7 / max(interval_days, 1))
    current_size = sizes_mb[0]
    projected_90d = current_size + (growth_per_week * (90 / 7))
    
    if growth_per_week > 10:
        trend = "increasing"
    elif growth_per_week < -10:
        trend = "decreasing"
    else:
        trend = "stable"
    
    return GrowthRate(
        growth_rate_mb_per_week=round(growth_per_week, 2),
        trend=trend,
        projected_90d_mb=round(projected_90d, 2),
        historical_points=n,
        method="linear",
    )


def bloat_detection(
    base_dir: Path = None,
    include_growth: bool = True,
) -> dict:
    """Detect memory bloat across all storage layers.
    
    Args:
        base_dir: Override base directory (default: ~/.openclaw/workspace)
        include_growth: Include growth rate projection
        
    Returns:
        dict: {
            "score": int,
            "status": str,
            "total_bytes": int,
            "total_mb": float,
            "file_counts": dict,
            "growth_rate": dict | None,
            "projected_critical_date": str | None,
        }
    """
    if base_dir is None:
        base_dir = Path.home() / ".openclaw" / "workspace"
    
    config = load_config()
    thresholds = config.get("thresholds", {}).get("bloat_mb", {})
    healthy_mb = thresholds.get("healthy", 500)
    warning_mb = thresholds.get("warning", 2000)
    critical_mb = thresholds.get("critical", 5000)
    
    memory_dir = base_dir / "memory"
    
    if not memory_dir.exists():
        return {
            "score": 100,
            "status": "healthy",
            "total_bytes": 0,
            "total_mb": 0.0,
            "file_counts": {"total": 0, "md": 0, "sqlite": 0, "json": 0, "other": 0},
            "growth_rate": None,
            "projected_critical_date": None,
        }
    
    total_bytes = get_dir_size(memory_dir)
    total_mb = total_bytes / (1024 * 1024)
    file_counts = get_file_counts(memory_dir)
    
    # Determine score and status
    if total_mb < healthy_mb:
        status = "healthy"
        score = 100
    elif total_mb < warning_mb:
        status = "warning"
        score = 60
    else:
        status = "critical"
        score = 20
    
    # Growth rate analysis
    growth_rate = None
    projected_critical = None
    
    if include_growth:
        growth_rate = get_growth_rate(base_dir)
        
        if growth_rate and growth_rate.projected_90d_mb > critical_mb:
            # Project when we'll hit critical
            if growth_rate.growth_rate_mb_per_week > 0:
                weeks_to_critical = (critical_mb - total_mb) / max(growth_rate.growth_rate_mb_per_week, 0.1)
                days_to_critical = int(weeks_to_critical * 7)
                future_date = datetime.now(tz=timezone.utc) + timedelta(days=days_to_critical)
                projected_critical = future_date.strftime("%Y-%m-%d")
    
    return {
        "score": score,
        "status": status,
        "total_bytes": total_bytes,
        "total_mb": round(total_mb, 2),
        "file_counts": file_counts,
        "growth_rate": {
            "growth_rate_mb_per_week": growth_rate.growth_rate_mb_per_week if growth_rate else 0,
            "trend": growth_rate.trend if growth_rate else "unknown",
            "projected_90d_mb": growth_rate.projected_90d_mb if growth_rate else None,
            "historical_points": growth_rate.historical_points if growth_rate else 0,
        } if growth_rate else None,
        "projected_critical_date": projected_critical,
    }


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Detect memory bloat")
    parser.add_argument("--base-dir", type=Path, default=None)
    parser.add_argument("--no-growth", action="store_true")
    parser.add_argument("-v", "--verbose", action="store_true")
    args = parser.parse_args()
    
    if args.verbose:
        import logging
        get_logger().setLevel(logging.DEBUG)
    
    result = bloat_detection(
        base_dir=args.base_dir,
        include_growth=not args.no_growth,
    )
    
    print(f"[bloat_detector] Status: {result['status']}, Size: {result.get('total_mb', 0)}MB, Score: {result['score']}")
    print(json.dumps(result, indent=2, ensure_ascii=False))