文件预览

agent_loops_integration.md

查看 AGI记忆模组 技能包中的文件内容。

文件内容

references/agent_loops_integration.md

# Agent Loops 集成指南

> 本文档说明如何将 Agent Memory System 与 Agent Loop 集成,包含完整的端到端示例、故障排查和最佳实践。

## 目录

1. [概述](#概述)
2. [集成方式](#集成方式)
3. [完整端到端示例](#完整端到端示例)
4. [故障排查指南](#故障排查指南)
5. [性能优化配置](#性能优化配置)
6. [实际应用场景](#实际应用场景)
7. [依赖和环境要求](#依赖和环境要求)
8. [配置参数详解](#配置参数详解)
9. [最佳实践](#最佳实践)

---

## 概述

Agent Loop 是智能体的核心执行模式,通过"感知-决策-行动-观察"循环完成任务。Agent Memory System 为 Agent Loop 提供记忆能力,实现跨会话的上下文保持和智能决策。

---

## 集成方式

### 方式 1:状态同步集成

```python
from scripts.state_capture import GlobalStateCapture

# 创建状态捕捉器
capture = GlobalStateCapture(
    user_id="user_123",
    storage_path="./state_storage",
)

# 在 Agent Loop 的每个节点同步状态
checkpoint_id = capture.sync_from_langgraph(
    state={"phase": "planning", "current_task": "task_1"},
    node_name="planner",
)
```

### 方式 2:事件订阅集成

```python
# 订阅关键事件
subscription_id = capture.subscribe(
    event_types=[
        StateEventType.PHASE_CHANGE,
        StateEventType.TASK_SWITCH,
        StateEventType.ERROR_OCCURRED,
    ],
    callback=on_state_change,
)

def on_state_change(event):
    # 处理状态变化
    print(f"状态变化: {event.event_type}")
```

### 方式 3:完整循环集成

```python
from scripts.context_orchestrator import create_context_orchestrator

# 创建编排器
orchestrator = create_context_orchestrator(
    user_id="user_123",
    session_id="session_456",
    max_context_tokens=32000,
)

# 在 Agent Loop 的每个步骤使用
for step in agent_loop:
    # 准备上下文(包含历史记忆)
    context = orchestrator.prepare_context(
        user_input=step.input,
        system_instruction=step.instruction,
    )

    # 执行决策
    action = decide(context)

    # 存储结果到记忆
    orchestrator.store_memory(
        content=action.description,
        bucket_type=SemanticBucketType.TASK_RESULT,
    )
```

---

## 完整端到端示例

### 基础 Agent Loop 集成

```python
from scripts import (
    create_context_orchestrator,
    GlobalStateCapture,
    SemanticBucketType,
    StateEventType,
)

class AgentLoopWithMemory:
    """集成 Agent Memory System 的 Agent Loop"""
    
    def __init__(self, user_id: str, session_id: str):
        # 初始化记忆系统
        self.orchestrator = create_context_orchestrator(
            user_id=user_id,
            session_id=session_id,
            max_context_tokens=32000,
        )
        
        # 初始化状态捕捉
        self.state_capture = GlobalStateCapture(
            user_id=user_id,
            storage_path="./state_storage",
        )
        
        # 订阅关键事件
        self.subscription_id = self.state_capture.subscribe(
            event_types=[
                StateEventType.PHASE_CHANGE,
                StateEventType.TASK_SWITCH,
                StateEventType.ERROR_OCCURRED,
            ],
            callback=self._on_state_change,
        )
        
        self.max_iterations = 10
    
    def run(self, user_input: str, tools: list) -> str:
        """执行 Agent Loop"""
        try:
            # 存储用户意图
            self.orchestrator.store_memory(
                content=user_input,
                bucket_type=SemanticBucketType.USER_INTENT,
            )
            
            # 主循环
            for iteration in range(self.max_iterations):
                # 准备上下文(包含历史记忆)
                context = self.orchestrator.prepare_context(
                    user_input=user_input,
                    system_instruction="你是一个智能助手",
                )
                
                # 执行决策和行动
                action = self._decide(context, tools)
                result = self._execute_action(action)
                
                # 存储结果到记忆
                self.orchestrator.store_memory(
                    content=result,
                    bucket_type=SemanticBucketType.TASK_RESULT,
                )
                
                # 同步状态
                self.state_capture.sync_from_langgraph(
                    state={"phase": "executing", "iteration": iteration},
                    node_name="executor",
                )
                
                if self._is_task_complete(result):
                    break
            
            return self._generate_final_answer()
            
        except Exception as e:
            print(f"Agent Loop 执行失败: {e}")
            return self._fallback_response(user_input)
        
        finally:
            # 清理资源
            stats = self.orchestrator.end_session()
            self.state_capture.unsubscribe(self.subscription_id)
    
    def _on_state_change(self, event):
        """状态变化回调"""
        print(f"[状态变化] {event.event_type}: {event.details}")

# 使用示例
agent = AgentLoopWithMemory("user_123", "session_456")
result = agent.run("帮我分析数据", [analyze_tool])
```

### LangGraph 深度集成

```python
from langgraph.graph import StateGraph, END
from scripts import create_context_orchestrator, SemanticBucketType

def build_agent_graph(user_id: str, session_id: str):
    """构建集成记忆系统的 LangGraph"""
    
    orchestrator = create_context_orchestrator(
        user_id=user_id,
        session_id=session_id,
    )
    
    def planner_node(state):
        context = orchestrator.prepare_context(
            user_input=state["user_input"],
            system_instruction="你是规划专家",
        )
        plan = generate_plan(context)
        orchestrator.store_memory(str(plan), SemanticBucketType.PLAN)
        return {"plan": plan}
    
    def executor_node(state):
        context = orchestrator.prepare_context(state["plan"])
        result = execute_plan(context, state["plan"])
        orchestrator.store_memory(result, SemanticBucketType.TASK_RESULT)
        return {"result": result}
    
    # 构建图
    graph = StateGraph(AgentState)
    graph.add_node("planner", planner_node)
    graph.add_node("executor", executor_node)
    graph.set_entry_point("planner")
    graph.add_edge("planner", "executor")
    graph.add_edge("executor", END)
    
    return graph.compile(), orchestrator
```

---

## 故障排查指南

### 常见问题

#### 问题 1: 导入错误

```python
ImportError: cannot import name 'GlobalStateCapture'
```

**解决方案**: 检查导入路径
```python
from scripts import GlobalStateCapture  # ✓ 正确
from scripts.state_capture import GlobalStateCapture  # ✗ 错误
```

#### 问题 2: Token 预算不足

```python
ValueError: Context exceeds token budget: 35000 > 32000
```

**解决方案**:
```python
# 方案 1: 增加预算
orchestrator = create_context_orchestrator(max_context_tokens=48000)

# 方案 2: 启用压缩
context = orchestrator.prepare_context(enable_compression=True)

# 方案 3: 限制历史
context = orchestrator.prepare_context(max_history_turns=5)
```

#### 问题 3: Redis 连接失败

```python
ConnectionError: Could not connect to Redis
```

**解决方案**:
```python
# 使用文件存储降级
orchestrator = create_context_orchestrator(
    storage_backend="file",  # 降级到文件存储
)
```

### 调试技巧

```python
import logging

# 启用详细日志
logging.basicConfig(level=logging.DEBUG)

# 获取统计信息
stats = orchestrator.get_stats()
print(f"Token 使用: {stats['token_usage']}")
print(f"记忆数量: {stats['memory_count']}")
print(f"缓存命中率: {stats['cache_hit_rate']:.1%}")

# 检查状态同步
state_snapshot = state_capture.get_current_state()
print(f"当前阶段: {state_snapshot.phase}")
```

---

## 性能优化配置

### Token 预算管理

```python
# 根据任务复杂度调整预算
orchestrator = create_context_orchestrator(
    max_context_tokens=48000,  # 复杂任务增加预算
)

# 动态压缩
context = orchestrator.prepare_context(
    enable_compression=True,  # 启用自动压缩
    compression_threshold=0.8,  # 压缩阈值
)
```

### 缓存策略

```python
# 配置缓存
orchestrator = create_context_orchestrator(
    cache_config={
        "enable_l1_cache": True,  # 内存缓存
        "l1_max_size": 1000,      # 最大条目数
    }
)

# 预热缓存
orchestrator.warmup_cache(hot_memories=["user_profile"])
```

---

## 实际应用场景

### 场景 1: 代码助手

```python
class CodeAssistant:
    """带记忆的代码助手"""
    
    def __init__(self, user_id: str):
        self.orchestrator = create_context_orchestrator(
            user_id=user_id,
            max_context_tokens=48000,  # 代码需要更多上下文
        )
    
    def analyze_code(self, code: str):
        # 存储代码上下文
        self.orchestrator.store_memory(
            f"用户代码: {code[:200]}...",
            SemanticBucketType.CODE,
        )
        
        # 准备上下文(包含编程习惯)
        context = self.orchestrator.prepare_context(
            user_input=code,
            system_instruction="你是代码分析专家",
        )
        
        return analyze(context, code)
```

### 场景 2: 多轮对话

```python
class ChatAgent:
    """带记忆的对话代理"""
    
    def __init__(self, user_id: str):
        self.orchestrator = create_context_orchestrator(user_id=user_id)
    
    def chat(self, message: str):
        # 准备上下文(包含对话历史)
        context = self.orchestrator.prepare_context(
            user_input=message,
            system_instruction="记住用户偏好和历史对话",
        )
        
        response = generate_response(context, message)
        
        # 存储对话
        self.orchestrator.store_memory(
            response,
            SemanticBucketType.CONVERSATION,
        )
        
        return response
```

---

## 依赖和环境要求

### 版本要求

```txt
pydantic>=2.0.0          # 数据模型验证
typing-extensions>=4.0.0 # 类型扩展
langgraph>=0.0.20        # LangGraph 框架(可选)
tiktoken>=0.5.0          # Token 计数
redis>=4.5.0             # Redis 存储(可选)
mmh3>=3.0.0              # 布隆过滤器(可选)
cryptography>=41.0.0     # 数据加密(可选)
```

### 环境配置

```python
from dataclasses import dataclass

@dataclass
class MemoryConfig:
    """记忆系统配置"""
    storage_base_path: str = "./memory_data"
    max_context_tokens: int = 32000
    redis_host: str = "localhost"  # 可选
```

---

## 配置参数详解

### ContextOrchestrator 参数

| 参数 | 类型 | 默认值 | 说明 | 推荐值 |
|------|------|--------|------|--------|
| `user_id` | str | 必填 | 用户唯一标识 | - |
| `session_id` | str | 必填 | 会话唯一标识 | - |
| `max_context_tokens` | int | 32000 | 最大上下文 Token 数 | 简单任务: 16000<br>复杂任务: 48000 |
| `enable_compression` | bool | True | 启用自动压缩 | 建议启用 |
| `max_history_turns` | int | 10 | 最大历史轮次 | 对话: 20<br>任务: 5 |

### GlobalStateCapture 参数

| 参数 | 类型 | 默认值 | 说明 |
|------|------|--------|------|
| `user_id` | str | 必填 | 用户唯一标识 |
| `storage_path` | str | "./state_storage" | 状态存储路径 |
| `max_checkpoints` | int | 100 | 最大检查点数 |

---

## 最佳实践

### 1. 使用元技能常驻

Agent Memory System 是元技能,应在 Agent Loop 启动时初始化,在整个循环周期内复用:

```python
# ✓ 正确:在启动时初始化
orchestrator = create_context_orchestrator(user_id, session_id)

for step in agent_loop:
    # 复用同一个实例
    context = orchestrator.prepare_context(step.input)

# ✗ 错误:每次循环都创建新实例
for step in agent_loop:
    orchestrator = create_context_orchestrator(user_id, session_id)
```

### 2. 合理配置 Token 预算

根据任务复杂度调整 `max_context_tokens`:

```python
# 简单任务(对话、查询)
max_context_tokens=16000

# 中等任务(分析、总结)
max_context_tokens=32000

# 复杂任务(代码分析、多步推理)
max_context_tokens=48000
```

### 3. 事件驱动更新

仅在关键事件时更新记忆,避免频繁的状态同步:

```python
# ✓ 正确:关键事件时更新
if task_complete or error_occurred:
    orchestrator.store_memory(result, bucket_type)

# ✗ 错误:每次循环都更新
for step in agent_loop:
    orchestrator.store_memory(result, bucket_type)
```

### 4. 错误处理和降级

捕获并处理状态同步错误,使用降级策略确保可用性:

```python
try:
    context = orchestrator.prepare_context(user_input)
except Exception as e:
    # 降级策略:使用简化上下文
    context = simplified_context(user_input)
    logger.warning(f"Context preparation failed, using fallback: {e}")
```

---

## 关键集成点

### 1. 初始化阶段
- 创建 `GlobalStateCapture` 实例
- 创建 `ContextOrchestrator` 实例
- 订阅关键事件

### 2. 执行阶段
- 每个状态变化时调用 `sync_from_langgraph`
- 使用 `prepare_context` 获取历史上下文
- 使用 `store_memory` 存储执行结果

### 3. 清理阶段
- 调用 `end_session` 保存会话统计
- 取消事件订阅

---

## 参考文档

- [架构总览](architecture_overview.md)
- [Agent Loops 进阶指导](agent_loops_advanced.md)
- [状态管理类 API](api_class_reference.md#三状态管理类)
- [上下文编排类 API](api_class_reference.md#七上下文编排类)
- [使用指南](usage_guide.md)

---

> **文档版本**:2.0  
> **更新日期**:2024年  
> **适用场景**:Agent Memory System 集成、状态同步、事件订阅、性能优化