文件预览

self_optimizer.py

查看 Self Optimizer 技能包中的文件内容。

文件内容

scripts/self_optimizer.py

#!/usr/bin/env python3
import argparse
import json
import re
import sys
from collections import Counter
from datetime import datetime, timedelta
import os
from pathlib import Path
from typing import List, Dict, Any, Optional

# OpenClaw imports (if available) - primarily for type hinting and potential future direct integration
# Not for direct execution via python3 script.py, but for when the script is imported/used within agent.
# We will pass necessary data to the script, not have it fetch data itself via default_api.


class SelfOptimizer:
    def __init__(self, logs_dir: Path, openclaw_home: Path):
        self.logs_dir = logs_dir
        self.openclaw_home = openclaw_home
        self.log_patterns = {
            "error": r"\b(Error|ERROR|exception|Exception|Fail):|\b(Failed to|error )", # Refined regex
            "restart": r"received SIGUSR1; restarting|received SIGTERM; shutting down",
            "gateway_status": r"gateway] listening on ws",
            "node_unavailable": r"remote bin probe skipped: node unavailable",
            "openrouter_403": r"403 Key limit exceeded",
            "config_change": r"config change detected; evaluating reload"
        }

    def _read_log_file(self, file_path, lines=500):
        try:
            if not os.path.exists(file_path):
                return []
            with open(file_path, "r", encoding="utf-8", errors="ignore") as f:
                return f.readlines()[-lines:]
        except (FileNotFoundError, PermissionError, IOError) as e:
            # Silently return empty list for missing or unreadable files
            return []
        except Exception as e:
            # Log unexpected errors but don't fail
            print(f"Warning: Error reading log file {file_path}: {e}", file=sys.stderr)
            return []

    def analyze_logs(self, minutes_back=60):
        try:
            gateway_log_dir = self.logs_dir
            openclaw_log_dir = self.logs_dir

            gateway_log_path = os.path.join(gateway_log_dir, "gateway.log")
            openclaw_log_path = os.path.join(openclaw_log_dir, "openclaw.log") # Assuming openclaw.log can also be in same dir

            all_logs = self._read_log_file(gateway_log_path)
            all_logs.extend(self._read_log_file(openclaw_log_path))
        except Exception as e:
            # Return empty analysis if log reading fails completely
            return {
                "errors": [f"Failed to read log files: {str(e)}"],
                "restarts": 0,
                "config_changes": 0,
                "openrouter_403s": 0,
                "node_unavailable_mentions": 0,
                "suggestions": ["Log file access error. Check file permissions and paths."]
            }

        recent_logs = []
        time_cutoff = datetime.now() - timedelta(minutes=minutes_back)

        for line in all_logs:
            try:
                # Assuming log format like: 2026-02-16T21:27:45.414Z [heartbeat] started
                # Or: 2026-02-16 23:50:54 PST ...
                
                # Try parsing ISO 8601 with timezone (Z)
                match_iso = re.match(r"^(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z)", line)
                if match_iso:
                    timestamp_str = match_iso.group(1)
                    log_time = datetime.strptime(timestamp_str, "%Y-%m-%dT%H:%M:%S.%fZ")
                else:
                    # Try parsing local time with timezone abbr (e.g., PST)
                    match_local = re.match(r"^(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2} \w{3})", line)
                    if match_local:
                        timestamp_str = match_local.group(1)
                        # Remove timezone abbreviation for parsing, as strptime doesn't handle them reliably
                        timestamp_without_tz = " ".join(timestamp_str.split(" ")[:-1])
                        # This assumes the current system's timezone for conversion, which is usually desired
                        log_time = datetime.strptime(timestamp_without_tz, "%Y-%m-%d %H:%M:%S")
                    else:
                        continue # Skip lines without a recognizable timestamp

                if log_time >= time_cutoff:
                    # Exclude the specific delivery-recovery success message
                    if not "delivery recovery complete: 0 recovered, 0 failed, 0 skipped" in line.lower():
                        recent_logs.append(line)
            except ValueError:
                # Log line might not have a perfect timestamp, skip
                continue

        analysis = {
            "errors": [],
            "restarts": 0,
            "config_changes": 0,
            "openrouter_403s": 0,
            "node_unavailable_mentions": 0,
            "suggestions": []
        }

        for line in recent_logs:
            if re.search(self.log_patterns["error"], line, re.IGNORECASE):
                analysis["errors"].append(line.strip())
            if re.search(self.log_patterns["restart"], line):
                analysis["restarts"] += 1
            if re.search(self.log_patterns["openrouter_403"], line):
                analysis["openrouter_403s"] += 1
            if re.search(self.log_patterns["node_unavailable"], line):
                analysis["node_unavailable_mentions"] += 1
            if re.search(self.log_patterns["config_change"], line):
                analysis["config_changes"] += 1
        
        # Add basic suggestions here
        if analysis["openrouter_403s"] > 0:
            analysis["suggestions"].append("OpenRouter API key might be exhausted or improperly configured. Check https://openrouter.ai/settings/keys.")
        if analysis["restarts"] >= 2: # More than one restart in the window is suspicious
            analysis["suggestions"].append("Frequent gateway restarts detected. Investigate recent configuration changes or system instability.")
        if analysis["errors"]:
            analysis["suggestions"].append(f"Found {len(analysis['errors'])} error(s) in logs. Review specific error messages for actionable insights.")
        if analysis["node_unavailable_mentions"] > 0:
            analysis["suggestions"].append(f"Node unavailable mentions detected ({analysis['node_unavailable_mentions']}). Ensure companion app is running and connected on 'ghost’s MacBook Pro'.")
        if analysis["config_changes"] >= 3:
            analysis["suggestions"].append("Multiple configuration changes detected. Reviewing `openclaw.json` for unintended changes or excessive patching might be beneficial.")

        return analysis

    def analyze_chat_history(self, chat_history_data: List[Dict[str, Any]], lookback_minutes=120):
        if not chat_history_data:
            return {"status": "No chat history data provided", "suggestions": []}

        try:
            chat_issues: Dict[str, int] = Counter()
            suggestions: List[str] = []

            problem_keywords = {
                "error": ["error", "fail", "issue", "problem", "bug"],
                "confusion": ["confused", "don't understand", "unclear", "huh?"],
                "performance": ["slow", "lag", "cost", "expensive", "quota"],
                "unavailability": ["unavailable", "not working", "disconnected"],
                "delegation": ["subagent failed", "spawn failed", "agent failed"]
            }

            time_cutoff = datetime.now() - timedelta(minutes=lookback_minutes)

            for entry in chat_history_data:
                try:
                    if not isinstance(entry, dict):
                        continue
                    if 'message' in entry and entry.get('role') == 'user': # Only analyze user messages
                        # Handle timestamp parsing gracefully
                        timestamp = entry.get('timestamp', '')
                        if not timestamp:
                            continue
                        try:
                            message_time = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
                        except (ValueError, AttributeError):
                            continue
                        if message_time < time_cutoff:
                            continue

                        text = str(entry.get('message', '')).lower()
                        for issue_type, keywords in problem_keywords.items():
                            for keyword in keywords:
                                if keyword in text:
                                    chat_issues[issue_type] += 1
                except (KeyError, AttributeError, ValueError) as e:
                    # Skip malformed entries
                    continue
        except Exception as e:
            return {
                "status": f"Error analyzing chat history: {str(e)}",
                "issues_summary": {},
                "suggestions": ["Chat history analysis failed. Check data format."]
            }

        if chat_issues["error"] > 0:
            suggestions.append(f"Recurring errors detected in chat history ({chat_issues['error']} mentions from user). Investigate common error patterns in user interactions.")
        if chat_issues["confusion"] > 0:
            suggestions.append(f"User confusion detected in chat history ({chat_issues['confusion']} mentions). Consider clarifying responses or providing more context to the user.")
        if chat_issues["performance"] > 0:
            suggestions.append(f"User concerns about performance/cost detected in chat history ({chat_issues['performance']} mentions). Optimize resource usage or suggest cheaper models for user tasks.")
        if chat_issues["unavailability"] > 0:
            suggestions.append(f"Repeated tool/service unavailability mentioned by user ({chat_issues['unavailability']} mentions). Check external service statuses or authentication for used tools.")
        if chat_issues["delegation"] > 0:
            suggestions.append(f"Sub-agent/delegation failures mentioned by user ({chat_issues['delegation']} mentions). Review agent swarm routing, config, or sub-agent skill issues.")

        return {"status": "Analyzed", "issues_summary": dict(chat_issues), "suggestions": suggestions}

    def analyze_root_folder(self):
        # Placeholder for future implementation
        return {"status": "Not yet implemented", "suggestions": []}

    def propose_improvements(self, chat_history_data: Optional[List[Dict[str, Any]]] = None):
        try:
            log_analysis = self.analyze_logs()
        except Exception as e:
            log_analysis = {
                "errors": [f"Log analysis failed: {str(e)}"],
                "restarts": 0,
                "config_changes": 0,
                "openrouter_403s": 0,
                "node_unavailable_mentions": 0,
                "suggestions": ["Log analysis encountered an error. Check system logs for details."]
            }
        
        try:
            chat_history_analysis = self.analyze_chat_history(chat_history_data=chat_history_data)
        except Exception as e:
            chat_history_analysis = {
                "status": f"Error: {str(e)}",
                "issues_summary": {},
                "suggestions": ["Chat history analysis encountered an error."]
            }
        
        try:
            root_folder_analysis = self.analyze_root_folder()
        except Exception as e:
            root_folder_analysis = {
                "status": f"Error: {str(e)}",
                "suggestions": []
            }

        proposals = []
        if log_analysis.get("suggestions"):
            proposals.extend(log_analysis["suggestions"])
        if chat_history_analysis.get("suggestions"):
            proposals.extend(chat_history_analysis["suggestions"])
        if root_folder_analysis.get("suggestions"):
            proposals.extend(root_folder_analysis["suggestions"])
        
        if not proposals:
            proposals.append("No critical issues found in recent activity. Consider optimizing frequently used skills or automating routine checks for proactive maintenance.")

        overall_report = {
            "log_analysis": log_analysis,
            "chat_history_analysis": chat_history_analysis,
            "root_folder_analysis": root_folder_analysis,
            "proposals": proposals
        }
        return overall_report


def main():
    parser = argparse.ArgumentParser(description="OpenClaw Self-Optimizer Skill.")
    parser.add_argument("command", choices=["analyze"], default="analyze", nargs="?", help="The command to run (default: analyze).")
    parser.add_argument("--json", action="store_true", help="Output results in JSON format.")
    parser.add_argument("--chat-history-file", type=str, help="Path to a JSON file containing chat history data.")
    args = parser.parse_args()

    # Determine logs directory based on OPENCLAW_HOME
    openclaw_home = Path(os.environ.get("OPENCLAW_HOME", Path.home() / ".openclaw"))
    logs_dir = openclaw_home / "logs"

    optimizer = SelfOptimizer(logs_dir, openclaw_home)

    if args.command == "analyze":
        chat_history_data = []
        if args.chat_history_file:
            try:
                with open(args.chat_history_file, "r") as f:
                    chat_history_data = json.load(f)
            except FileNotFoundError:
                print(f"Error: Chat history file not found: {args.chat_history_file}", file=sys.stderr)
                sys.exit(1)
            except json.JSONDecodeError:
                print(f"Error: Invalid JSON in chat history file: {args.chat_history_file}", file=sys.stderr)
                sys.exit(1)

        report = optimizer.propose_improvements(chat_history_data=chat_history_data)
        if args.json:
            print(json.dumps(report, indent=2))
        else:
            print("\\n--- OpenClaw Self-Optimization Report ---")
            print("\\nLog Analysis:")
            print(f"  Recent Errors: {len(report['log_analysis']['errors'])} detected")
            for error in report['log_analysis']['errors']:
                print(f"    - {error}")
            print(f"  Gateway Restarts: {report['log_analysis']['restarts']} detected")
            print(f"  OpenRouter 403s: {report['log_analysis']['openrouter_403s']} detected")
            print(f"  Node Unavailable Mentions: {report['log_analysis']['node_unavailable_mentions']} detected")
            print(f"  Config Changes: {report['log_analysis']['config_changes']} detected")
            
            print("\\nChat History Analysis:")
            print(f"  Status: {report['chat_history_analysis']['status']}")
            if report['chat_history_analysis']['suggestions']:
                 print("  Suggestions:")
                 for suggestion in report['chat_history_analysis']['suggestions']:
                     print(f"    - {suggestion}")         

            print("\\nRoot Folder Analysis:")
            print(f"  Status: {report['root_folder_analysis']['status']}")
            
            print("\\n--- Self-Improvement Proposals ---")
            if report['proposals']:
                for i, proposal in enumerate(report['proposals']):
                    print(f'{i + 1}. {proposal}')
            else:
                print("No specific proposals at this time.")

    else:
        print(f"Unknown command: {args.command}", file=sys.stderr)
        sys.exit(1)

if __name__ == "__main__":
    main()