文件预览

workbench_v1.0.10.py

查看 EvoMap WorkBench v1.0.11 Mini 技能包中的文件内容。

文件内容

lib/workbench_v1.0.10.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
工作流引擎 v1.0.10 - 完整版
功能:工作流管理、执行统计、错误恢复
"""

from typing import Dict, List, Any, Optional
from datetime import datetime
import json
import time

from .ai_decision_evolution import AIDecisionEvolutionEngine


class EvoMapWorkBench:
    """EvoMap WorkBench 核心工作流引擎"""
    
    def __init__(self, config: Optional[Dict] = None):
        """初始化 WorkBench"""
        self.config = config or {}
        self.engine = AIDecisionEvolutionEngine()
        self.stats = {
            'total_tasks': 0,
            'successful_tasks': 0,
            'failed_tasks': 0,
            'start_time': datetime.utcnow().isoformat(),
            'execution_times': []
        }
        self.error_recovery_count = 0
        self.last_error = None
    
    def analyze(self, context: Dict) -> Dict:
        """分析问题"""
        start_time = time.time()
        
        decision = self.engine.make_decision(context)
        
        self.stats['total_tasks'] += 1
        execution_time = time.time() - start_time
        self.stats['execution_times'].append(execution_time)
        
        return decision
    
    def execute(self, decision: Dict) -> Dict:
        """执行决策"""
        try:
            # 模拟执行
            outcome = 'success'
            
            # 记录结果
            self.engine.record_outcome(decision['decision_id'], outcome)
            
            if outcome == 'success':
                self.stats['successful_tasks'] += 1
            else:
                self.stats['failed_tasks'] += 1
            
            return {
                'outcome': outcome,
                'decision': decision,
                'timestamp': datetime.utcnow().isoformat()
            }
        
        except Exception as e:
            self.last_error = str(e)
            self.error_recovery_count += 1
            
            # 错误恢复
            return self._recover_from_error(e, decision)
    
    def _recover_from_error(self, error: Exception, decision: Dict) -> Dict:
        """错误恢复"""
        # 记录错误
        error_info = {
            'error': str(error),
            'decision': decision.get('decision_id'),
            'timestamp': datetime.utcnow().isoformat()
        }
        
        # 尝试恢复
        recovery_decision = self.engine.make_decision({
            'error_type': 'execution_error',
            'error_message': str(error)
        })
        
        return {
            'outcome': 'recovered',
            'error': error_info,
            'recovery_decision': recovery_decision,
            'timestamp': datetime.utcnow().isoformat()
        }
    
    def get_stats(self) -> Dict:
        """获取统计信息"""
        execution_times = self.stats['execution_times']
        
        return {
            **self.stats,
            'success_rate': self.stats['successful_tasks'] / self.stats['total_tasks'] if self.stats['total_tasks'] > 0 else 0,
            'avg_execution_time': sum(execution_times) / len(execution_times) if execution_times else 0,
            'error_recovery_count': self.error_recovery_count,
            'last_error': self.last_error
        }
    
    def get_execution_report(self) -> Dict:
        """获取执行报告"""
        return {
            'summary': self.get_stats(),
            'evolution_report': self.engine.get_evolution_report(),
            'generated_at': datetime.utcnow().isoformat()
        }
    
    def batch_analyze(self, contexts: List[Dict]) -> List[Dict]:
        """批量分析问题"""
        results = []
        for context in contexts:
            result = self.analyze(context)
            results.append(result)
        return results
    
    def batch_execute(self, decisions: List[Dict]) -> List[Dict]:
        """批量执行决策"""
        results = []
        for decision in decisions:
            result = self.execute(decision)
            results.append(result)
        return results
    
    def reset_stats(self):
        """重置统计"""
        self.stats = {
            'total_tasks': 0,
            'successful_tasks': 0,
            'failed_tasks': 0,
            'start_time': datetime.utcnow().isoformat(),
            'execution_times': []
        }
        self.error_recovery_count = 0
        self.last_error = None


if __name__ == "__main__":
    # 测试工作流引擎
    print("=== 测试工作流引擎 ===\n")
    
    bench = EvoMapWorkBench()
    
    # 分析问题
    context = {'error_type': '429', 'scenario_name': 'rate_limit'}
    decision = bench.analyze(context)
    
    print(f"决策:{decision['decision']}")
    print(f"置信度:{decision['confidence']:.2%}\n")
    
    # 执行决策
    result = bench.execute(decision)
    print(f"执行结果:{result['outcome']}\n")
    
    # 获取统计
    stats = bench.get_stats()
    print(f"统计信息:")
    print(f"  总任务数:{stats['total_tasks']}")
    print(f"  成功率:{stats['success_rate']:.2%}")
    print(f"  平均执行时间:{stats['avg_execution_time']:.3f}s")
    print(f"  错误恢复次数:{stats['error_recovery_count']}\n")
    
    # 获取执行报告
    report = bench.get_execution_report()
    print(f"执行报告:{json.dumps(report, indent=2, ensure_ascii=False)[:500]}...")