文件预览

generate_report.py

查看 🩺 memory-health-check 技能包中的文件内容。

文件内容

scripts/generate_report.py

#!/usr/bin/env python3
"""Generate health report."""
import argparse
import json
import logging
import subprocess
import sys
from datetime import datetime, timezone
from pathlib import Path

logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s")
logger = logging.getLogger("generate_report")

SKILL_DIR = Path(__file__).parent.parent
SKILL_BIN = SKILL_DIR / "bin"
REPORT_DIR = Path.home() / ".openclaw" / "workspace" / "memory" / "health-reports"


def parse_json_output(output: str) -> dict:
    """Parse the last JSON object from script output (robust)."""
    depth = 0
    start = -1
    for i, ch in enumerate(output):
        if ch == '{':
            if depth == 0:
                start = i
            depth += 1
        elif ch == '}':
            depth -= 1
            if depth == 0 and start >= 0:
                try:
                    return json.loads(output[start:i+1])
                except json.JSONDecodeError:
                    pass
    return {}


def run_script(script_name: str) -> dict:
    """Run a bin script and parse its JSON output."""
    script_path = SKILL_BIN / script_name
    try:
        result = subprocess.run(
            [sys.executable, str(script_path), "--json"],
            capture_output=True, text=True, timeout=120
        )
        if result.returncode == 0:
            parsed = parse_json_output(result.stdout)
            if parsed:
                return parsed
            logger.warning(f"No valid JSON from {script_name}: {result.stdout[:100]}")
        else:
            logger.warning(f"{script_name} exited {result.returncode}: {result.stderr[:200]}")
    except subprocess.TimeoutExpired:
        logger.warning(f"{script_name} timed out")
    except Exception as e:
        logger.warning(f"Error running {script_name}: {e}")
    return {}


def generate_recommendations(results: dict) -> list[dict]:
    """Generate actionable recommendations based on dimension scores."""
    recommendations = []

    dim_scores = results.get("dimensions", {})
    dim_status = results.get("dimension_status", {})

    for dim in ["integrity", "bloat", "orphans", "dedup", "freshness"]:
        score = dim_scores.get(dim, 100)
        status = dim_status.get(dim, "unknown")

        if status == "critical" or (isinstance(score, (int, float)) and score < 50):
            recommendations.append({
                "priority": "high",
                "dimension": dim,
                "message": f"CRITICAL: {dim} needs immediate attention (score: {score})",
            })
        elif status == "warning" or (isinstance(score, (int, float)) and score >= 50 and score < 80):
            recommendations.append({
                "priority": "medium",
                "dimension": dim,
                "message": f"WARNING: {dim} could be improved (score: {score})",
            })

    # Add bloat-specific recommendations
    if dim_scores.get("bloat", 100) < 80:
        recommendations.append({
            "priority": "medium",
            "dimension": "bloat",
            "message": "Consider running --auto-repair or manually cleaning old memory files",
        })

    return recommendations


def generate_report(verbose: bool = False) -> dict:
    """Generate a comprehensive health report."""
    if verbose:
        logger.setLevel(logging.DEBUG)

    REPORT_DIR.mkdir(parents=True, exist_ok=True)

    logger.info("Collecting health data...")

    # Gather all dimension results
    integrity = run_script("integrity_scan.py")
    bloat = run_script("bloat_detector.py")
    orphans = run_script("orphan_finder.py")
    dedup = run_script("dedup_scanner.py")
    freshness = run_script("freshness_report.py")

    dim_scores = {
        "integrity": integrity.get("score", 0) if isinstance(integrity, dict) else 0,
        "bloat": bloat.get("score", 0) if isinstance(bloat, dict) else 0,
        "orphans": orphans.get("score", 0) if isinstance(orphans, dict) else 0,
        "dedup": dedup.get("score", 0) if isinstance(dedup, dict) else 0,
        "freshness": freshness.get("score", 0) if isinstance(freshness, dict) else 0,
    }
    dim_status = {
        "integrity": integrity.get("status", "unknown") if isinstance(integrity, dict) else "unknown",
        "bloat": bloat.get("status", "unknown") if isinstance(bloat, dict) else "unknown",
        "orphans": orphans.get("status", "unknown") if isinstance(orphans, dict) else "unknown",
        "dedup": dedup.get("status", "unknown") if isinstance(dedup, dict) else "unknown",
        "freshness": freshness.get("status", "unknown") if isinstance(freshness, dict) else "unknown",
    }

    weights = {"integrity": 0.30, "bloat": 0.20, "orphans": 0.15, "dedup": 0.15, "freshness": 0.20}
    overall = round(sum(dim_scores[k] * weights[k] for k in weights), 1)
    overall_status = "healthy" if overall >= 80 else ("warning" if overall >= 50 else "critical")

    recommendations = generate_recommendations({
        "dimensions": dim_scores,
        "dimension_status": dim_status,
    })

    report = {
        "ts": datetime.now(tz=timezone.utc).isoformat(),
        "overall_score": overall,
        "status": overall_status,
        "dimensions": {
            "integrity": {"score": dim_scores["integrity"], "status": dim_status["integrity"]},
            "bloat": {"score": dim_scores["bloat"], "status": dim_status["bloat"], "total_mb": bloat.get("total_mb", 0) if isinstance(bloat, dict) else 0},
            "orphans": {"score": dim_scores["orphans"], "status": dim_status["orphans"], "orphan_count": orphans.get("orphan_count", 0) if isinstance(orphans, dict) else 0},
            "dedup": {"score": dim_scores["dedup"], "status": dim_status["dedup"], "dup_count": dedup.get("dup_count", 0) if isinstance(dedup, dict) else 0},
            "freshness": {"score": dim_scores["freshness"], "status": dim_status["freshness"], "freshness_rate": freshness.get("freshness_rate", 0) if isinstance(freshness, dict) else 0},
        },
        "recommendations": recommendations,
    }

    # Save report
    date_str = datetime.now(tz=timezone.utc).strftime("%Y-%m-%d_%H-%M")
    report_file = REPORT_DIR / f"health-report-{date_str}.json"
    report_file.write_text(json.dumps(report, indent=2, ensure_ascii=False))
    logger.info(f"Report saved to {report_file}")

    return report


def main():
    parser = argparse.ArgumentParser(description="Generate health report")
    parser.add_argument("--json", action="store_true", help="Output JSON only")
    parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
    args = parser.parse_args()

    result = generate_report(verbose=args.verbose)

    if args.json:
        print(json.dumps(result, indent=2, ensure_ascii=False))
    else:
        print(f"[generate_report] Overall: {result['overall_score']}/100 ({result['status']})")
        for rec in result.get("recommendations", []):
            print(f"  [{rec['priority'].upper()}] {rec['message']}")
        print(f"\nFull report saved to {REPORT_DIR}")


if __name__ == "__main__":
    main()