文件内容
scripts/long_term.py
# Agent Memory System
# Copyright (C) 2024 kiwifruit
#
# This file is part of Agent Memory System.
#
# Agent Memory System is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Agent Memory System is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Agent Memory System. If not, see <https://www.gnu.org/licenses/>.
"""
Agent Memory System - 长期记忆模块
=== 依赖与环境声明 ===
- 运行环境:Python >=3.9
- 直接依赖:
* pydantic: >=2.0.0
- 用途:数据模型验证
* typing-extensions: >=4.0.0
- 用途:类型扩展支持
- 标准配置文件:
```text
# requirements.txt
pydantic>=2.0.0
typing-extensions>=4.0.0
```
=== 声明结束 ===
安全提醒:定期运行 pip audit 进行安全审计
"""
from __future__ import annotations
import json
import uuid
from datetime import datetime
from pathlib import Path
from typing import Any
from .type_defs import (
LongTermMemoryContainer,
UserProfileMemory,
UserProfileData,
ProceduralMemory,
ProceduralMemoryData,
NarrativeMemory,
NarrativeMemoryData,
SemanticMemory,
SemanticMemoryData,
EmotionalMemory,
EmotionalMemoryData,
HeatLevel,
MemoryType,
ToolUsageRecord,
ToolOptimalContext,
ToolCombinationPattern,
NeuroticismTendency,
GrowthMilestone,
EmotionState,
ConceptDefinition,
# 反思类型
ReflectionMemoryItem,
MemoryCategory,
)
from .heat_manager import HeatManager
class LongTermMemoryManager:
"""
长期记忆管理器
负责持久化的经验与知识仓库,包括:
- 用户画像管理
- 程序性记忆管理(含工具使用模式)
- 叙事记忆管理
- 语义记忆管理
- 情感记忆管理
- 冷热度分层管理
"""
def __init__(
self,
user_id: str = "default_user",
storage_path: str = "./memory_storage",
) -> None:
"""
初始化长期记忆管理器
Args:
user_id: 用户ID
storage_path: 存储路径
"""
self.user_id: str = user_id
self.storage_path: Path = Path(storage_path)
self.storage_path.mkdir(parents=True, exist_ok=True)
self._container: LongTermMemoryContainer = LongTermMemoryContainer(
user_id=user_id
)
self._heat_manager: HeatManager = HeatManager()
# 尝试加载已有记忆
self._load_from_storage()
def _load_from_storage(self) -> None:
"""
从存储加载记忆(内部方法)
"""
storage_file: Path = self.storage_path / f"{self.user_id}_memory.json"
if storage_file.exists():
try:
with open(storage_file, "r", encoding="utf-8") as f:
data: dict[str, Any] = json.load(f)
self._container = LongTermMemoryContainer.model_validate(data)
except (json.JSONDecodeError, ValueError):
# 加载失败,使用空容器
pass
def _save_to_storage(self) -> None:
"""
保存记忆到存储(内部方法)
"""
storage_file: Path = self.storage_path / f"{self.user_id}_memory.json"
self._container.last_updated = datetime.now()
with open(storage_file, "w", encoding="utf-8") as f:
json.dump(self._container.model_dump(mode="json"), f, ensure_ascii=False, indent=2)
def get_user_profile(self) -> UserProfileData | None:
"""
获取用户画像
Returns:
用户画像数据,如果不存在返回 None
"""
if self._container.user_profile:
return self._container.user_profile.data
return None
def update_user_profile(self, profile_data: dict[str, Any]) -> str:
"""
更新用户画像
Args:
profile_data: 用户画像数据
Returns:
记忆ID
"""
if self._container.user_profile is None:
# 创建新画像
memory_id: str = f"profile_{uuid.uuid4().hex[:12]}"
self._container.user_profile = UserProfileMemory(
memory_id=memory_id,
user_id=self.user_id,
data=UserProfileData.model_validate(profile_data),
)
else:
# 更新现有画像
current_data: UserProfileData = self._container.user_profile.data
# 合并身份标签
if "identity" in profile_data:
for identity in profile_data["identity"]:
if identity not in current_data.identity:
current_data.identity.append(identity)
# 更新技术背景
if "technical_background" in profile_data:
tb: dict[str, Any] = profile_data["technical_background"]
if "domains" in tb:
for domain in tb["domains"]:
if domain not in current_data.technical_background.domains:
current_data.technical_background.domains.append(domain)
if "expertise_level" in tb:
current_data.technical_background.expertise_level = tb["expertise_level"]
# 更新沟通风格
if "communication_style" in profile_data:
cs: dict[str, Any] = profile_data["communication_style"]
if "style" in cs:
current_data.communication_style.style = cs["style"]
if "preference" in cs:
current_data.communication_style.preference = cs["preference"]
# 更新决策模式
if "decision_pattern" in profile_data:
dp: dict[str, Any] = profile_data["decision_pattern"]
if "type" in dp:
current_data.decision_pattern.type = dp["type"]
if "focus" in dp:
current_data.decision_pattern.focus = dp["focus"]
current_data.version += 1
self._save_to_storage()
return self._container.user_profile.memory_id
def get_procedural_memory(self) -> ProceduralMemoryData | None:
"""
获取程序性记忆
Returns:
程序性记忆数据
"""
if self._container.procedural:
return self._container.procedural.data
return None
def update_procedural_memory(self, procedural_data: dict[str, Any]) -> str:
"""
更新程序性记忆
Args:
procedural_data: 程序性记忆数据
Returns:
记忆ID
"""
if self._container.procedural is None:
memory_id: str = f"procedural_{uuid.uuid4().hex[:12]}"
self._container.procedural = ProceduralMemory(
memory_id=memory_id,
user_id=self.user_id,
data=ProceduralMemoryData.model_validate(procedural_data),
)
else:
current_data: ProceduralMemoryData = self._container.procedural.data
# 合并决策模式
if "decision_patterns" in procedural_data:
for pattern in procedural_data["decision_patterns"]:
current_data.decision_patterns.append(
{
"pattern_id": pattern.get("pattern_id", f"dp_{uuid.uuid4().hex[:8]}"),
"trigger_condition": pattern.get("trigger_condition", ""),
"workflow": pattern.get("workflow", []),
"confidence": pattern.get("confidence", 0.5),
"usage_count": pattern.get("usage_count", 1),
"success_rate": pattern.get("success_rate", 0.5),
}
)
# 更新操作偏好
if "operation_preferences" in procedural_data:
current_data.operation_preferences.update(
procedural_data["operation_preferences"]
)
self._save_to_storage()
return self._container.procedural.memory_id
def update_tool_usage(
self,
tool_name: str,
task_type: str,
outcome: str,
user_feedback: float | None = None,
effectiveness_score: float = 0.5,
# P0: 工具调用状态关联参数
checkpoint_id: str | None = None,
phase: str | None = None,
scenario: str | None = None,
user_state: str | None = None,
) -> str:
"""
更新工具使用记忆
Args:
tool_name: 工具名称
task_type: 任务类型
outcome: 结果(success/failure)
user_feedback: 用户反馈评分
effectiveness_score: 有效度分数
checkpoint_id: 关联的状态快照ID(来自GlobalStateCapture)
phase: 调用时的阶段
scenario: 调用时的场景
user_state: 调用时的用户状态
Returns:
记录ID
"""
if self._container.procedural is None:
memory_id: str = f"procedural_{uuid.uuid4().hex[:12]}"
self._container.procedural = ProceduralMemory(
memory_id=memory_id,
user_id=self.user_id,
data=ProceduralMemoryData(),
)
record: ToolUsageRecord = ToolUsageRecord(
record_id=f"tool_{uuid.uuid4().hex[:8]}",
timestamp=datetime.now(),
task_type=task_type,
tool_name=tool_name,
effectiveness_score=effectiveness_score,
outcome=outcome,
user_feedback=user_feedback,
checkpoint_id=checkpoint_id,
phase=phase,
scenario=scenario,
user_state=user_state,
)
self._container.procedural.data.tool_effectiveness_records.append(record)
self._save_to_storage()
return record.record_id
def get_tool_recommendation(
self,
task_type: str,
constraints: list[str] | None = None,
# P0: 状态感知过滤参数
phase: str | None = None,
scenario: str | None = None,
user_state: str | None = None,
state_weight: float = 0.3, # 状态匹配的权重
) -> dict[str, Any]:
"""
获取工具推荐(状态感知增强版)
Args:
task_type: 任务类型
constraints: 约束条件
phase: 当前阶段(用于状态感知过滤)
scenario: 当前场景(用于状态感知过滤)
user_state: 当前用户状态(用于状态感知过滤)
state_weight: 状态匹配的权重系数 (0.0-1.0)
Returns:
推荐结果
"""
default_result: dict[str, Any] = {
"tool": "代码解释器",
"confidence": 0.5,
"reasons": ["无历史记录,使用默认推荐"],
"all_candidates": [],
"state_filtered": False,
}
if self._container.procedural is None:
return default_result
records: list[ToolUsageRecord] = self._container.procedural.data.tool_effectiveness_records
# 按任务类型过滤
filtered_records: list[ToolUsageRecord] = [
r for r in records if r.task_type == task_type or task_type == ""
]
if not filtered_records:
return default_result
# 状态感知过滤:如果有状态参数,优先考虑状态匹配的记录
state_filtered_records: list[ToolUsageRecord] = []
has_state_filter: bool = phase is not None or scenario is not None or user_state is not None
if has_state_filter:
for record in filtered_records:
match_score: float = 0.0
match_count: int = 0
if phase is not None and record.phase == phase:
match_score += 1.0
match_count += 1
if scenario is not None and record.scenario == scenario:
match_score += 1.0
match_count += 1
if user_state is not None and record.user_state == user_state:
match_score += 1.0
match_count += 1
if match_count > 0:
state_filtered_records.append(record)
# 如果有状态匹配的记录,优先使用;否则使用全部记录
effective_records: list[ToolUsageRecord] = (
state_filtered_records if state_filtered_records else filtered_records
)
# 统计各工具的成功率
tool_stats: dict[str, dict[str, float]] = {}
for record in effective_records:
tool: str = record.tool_name
if tool not in tool_stats:
tool_stats[tool] = {"total": 0.0, "success": 0.0, "feedback": 0.0}
tool_stats[tool]["total"] += 1
if record.outcome == "success":
tool_stats[tool]["success"] += 1
if record.user_feedback:
tool_stats[tool]["feedback"] += record.user_feedback
if not tool_stats:
return default_result
# 计算综合分数
best_tool: str = ""
best_score: float = 0.0
for tool, stats in tool_stats.items():
success_rate: float = stats["success"] / stats["total"] if stats["total"] > 0 else 0
avg_feedback: float = stats["feedback"] / stats["total"] if stats["total"] > 0 else 0
# 基础分数
base_score: float = success_rate * 0.5 + avg_feedback / 5.0 * 0.3 + stats["total"] / 10 * 0.2
# 如果是状态过滤的结果,增加权重
if state_filtered_records:
base_score = base_score * (1 + state_weight)
if base_score > best_score:
best_score = base_score
best_tool = tool
return {
"tool": best_tool,
"confidence": min(0.95, best_score),
"reasons": [
f"历史成功率: {tool_stats[best_tool]['success'] / tool_stats[best_tool]['total']:.0%}" if tool_stats[best_tool]["total"] > 0 else "无历史记录",
f"使用次数: {int(tool_stats[best_tool]['total'])}",
],
"all_candidates": [
{
"tool": t,
"success_rate": s["success"] / s["total"] if s["total"] > 0 else 0,
"usage_count": int(s["total"]),
"avg_feedback": s["feedback"] / s["total"] if s["total"] > 0 and s["feedback"] > 0 else None,
}
for t, s in sorted(tool_stats.items(), key=lambda x: x[1]["success"] / max(x[1]["total"], 1), reverse=True)
],
"state_filtered": has_state_filter and len(state_filtered_records) > 0,
"state_match_count": len(state_filtered_records) if has_state_filter else 0,
}
def get_tool_effectiveness_summary(
self,
tool_name: str | None = None,
task_type: str | None = None,
) -> dict[str, Any]:
"""
获取工具效果汇总统计
Args:
tool_name: 工具名称(可选,不指定则返回所有工具)
task_type: 任务类型(可选,不指定则返回所有任务类型)
Returns:
工具效果汇总数据
"""
if self._container.procedural is None:
return {"tools": [], "total_records": 0}
records: list[ToolUsageRecord] = self._container.procedural.data.tool_effectiveness_records
# 过滤条件
if tool_name:
records = [r for r in records if r.tool_name == tool_name]
if task_type:
records = [r for r in records if r.task_type == task_type]
# 按工具分组统计
tool_summary: dict[str, dict[str, Any]] = {}
for record in records:
tool: str = record.tool_name
if tool not in tool_summary:
tool_summary[tool] = {
"tool_name": tool,
"total_uses": 0,
"success_count": 0,
"failure_count": 0,
"total_effectiveness": 0.0,
"feedback_sum": 0.0,
"feedback_count": 0,
"task_types": set(),
}
tool_summary[tool]["total_uses"] += 1
tool_summary[tool]["total_effectiveness"] += record.effectiveness_score
tool_summary[tool]["task_types"].add(record.task_type)
if record.outcome == "success":
tool_summary[tool]["success_count"] += 1
else:
tool_summary[tool]["failure_count"] += 1
if record.user_feedback is not None:
tool_summary[tool]["feedback_sum"] += record.user_feedback
tool_summary[tool]["feedback_count"] += 1
# 计算最终统计
result: dict[str, Any] = {
"tools": [],
"total_records": len(records),
}
for tool, stats in tool_summary.items():
success_rate: float = stats["success_count"] / stats["total_uses"] if stats["total_uses"] > 0 else 0
avg_effectiveness: float = stats["total_effectiveness"] / stats["total_uses"] if stats["total_uses"] > 0 else 0
avg_feedback: float = stats["feedback_sum"] / stats["feedback_count"] if stats["feedback_count"] > 0 else 0
result["tools"].append({
"tool_name": stats["tool_name"],
"total_uses": stats["total_uses"],
"success_rate": round(success_rate, 3),
"failure_rate": round(1 - success_rate, 3),
"avg_effectiveness": round(avg_effectiveness, 3),
"avg_user_feedback": round(avg_feedback, 2) if stats["feedback_count"] > 0 else None,
"task_types": list(stats["task_types"]),
})
# 按成功率排序
result["tools"].sort(key=lambda x: x["success_rate"], reverse=True)
return result
def record_tool_combination(
self,
tool_sequence: list[str],
task_pattern: str,
effectiveness: float,
) -> str:
"""
记录工具组合使用模式
Args:
tool_sequence: 工具使用序列,如 ["web_search", "code_interpreter", "file_write"]
task_pattern: 任务模式描述,如 "研究并实现功能"
effectiveness: 整体效果评分 (0.0-1.0)
Returns:
模式ID
"""
if self._container.procedural is None:
self._container.procedural = ProceduralMemory(
memory_id=f"procedural_{uuid.uuid4().hex[:12]}",
user_id=self.user_id,
data=ProceduralMemoryData(),
)
# 检查是否已存在相同序列
for pattern in self._container.procedural.data.tool_combination_patterns:
if pattern.sequence == tool_sequence and pattern.task_pattern == task_pattern:
# 更新已有模式的统计数据
new_count: int = pattern.usage_count + 1
new_avg: float = (pattern.effectiveness_avg * pattern.usage_count + effectiveness) / new_count
pattern.usage_count = new_count
pattern.effectiveness_avg = new_avg
self._save_to_storage()
return f"updated_{pattern.sequence[0]}_{pattern.usage_count}"
# 创建新模式
pattern_id: str = f"combo_{uuid.uuid4().hex[:8]}"
new_pattern: ToolCombinationPattern = ToolCombinationPattern(
sequence=tool_sequence,
task_pattern=task_pattern,
effectiveness_avg=effectiveness,
usage_count=1,
)
self._container.procedural.data.tool_combination_patterns.append(new_pattern)
self._save_to_storage()
return pattern_id
def get_tool_combination_patterns(
self,
task_pattern: str | None = None,
min_effectiveness: float = 0.0,
) -> list[dict[str, Any]]:
"""
获取工具组合模式
Args:
task_pattern: 任务模式过滤(可选)
min_effectiveness: 最小效果阈值(可选)
Returns:
工具组合模式列表
"""
if self._container.procedural is None:
return []
patterns: list[ToolCombinationPattern] = self._container.procedural.data.tool_combination_patterns
# 过滤
if task_pattern:
patterns = [p for p in patterns if task_pattern.lower() in p.task_pattern.lower()]
if min_effectiveness > 0:
patterns = [p for p in patterns if p.effectiveness_avg >= min_effectiveness]
# 按效果排序
patterns = sorted(patterns, key=lambda x: x.effectiveness_avg, reverse=True)
return [
{
"sequence": p.sequence,
"task_pattern": p.task_pattern,
"effectiveness_avg": round(p.effectiveness_avg, 3),
"usage_count": p.usage_count,
}
for p in patterns
]
def get_optimal_tool_context(self, tool_name: str) -> dict[str, Any]:
"""
获取工具最优使用场景
Args:
tool_name: 工具名称
Returns:
最优场景和规避场景信息
"""
if self._container.procedural is None:
return {"optimal_scenarios": [], "avoid_scenarios": [], "has_data": False}
# 从 tool_optimal_contexts 获取
if tool_name in self._container.procedural.data.tool_optimal_contexts:
context: ToolOptimalContext = self._container.procedural.data.tool_optimal_contexts[tool_name]
return {
"optimal_scenarios": context.optimal_scenarios,
"avoid_scenarios": context.avoid_scenarios,
"has_data": True,
}
# 基于历史记录推导
records: list[ToolUsageRecord] = [
r for r in self._container.procedural.data.tool_effectiveness_records
if r.tool_name == tool_name
]
if not records:
return {"optimal_scenarios": [], "avoid_scenarios": [], "has_data": False}
# 分析成功和失败的场景
optimal: list[dict[str, Any]] = []
avoid: list[dict[str, Any]] = []
success_records: list[ToolUsageRecord] = [r for r in records if r.outcome == "success"]
failure_records: list[ToolUsageRecord] = [r for r in records if r.outcome != "success"]
# 成功场景聚合
task_success: dict[str, list[float]] = {}
for r in success_records:
if r.task_type not in task_success:
task_success[r.task_type] = []
task_success[r.task_type].append(r.effectiveness_score)
for task_type, scores in task_success.items():
if len(scores) >= 2: # 至少2次成功才纳入
optimal.append({
"task_type": task_type,
"success_count": len(scores),
"avg_effectiveness": round(sum(scores) / len(scores), 3),
})
# 失败场景聚合
task_failure: dict[str, int] = {}
for r in failure_records:
task_failure[r.task_type] = task_failure.get(r.task_type, 0) + 1
for task_type, count in task_failure.items():
if count >= 2: # 至少2次失败才规避
avoid.append({
"task_type": task_type,
"failure_count": count,
})
return {
"optimal_scenarios": sorted(optimal, key=lambda x: x["success_count"], reverse=True),
"avoid_scenarios": sorted(avoid, key=lambda x: x["failure_count"], reverse=True),
"has_data": True,
"derived_from_history": True,
}
def update_neuroticism_tendency(
self, adjustment: float, source: str
) -> float:
"""
更新神经质倾向
Args:
adjustment: 调整值
source: 来源描述
Returns:
更新后的分数
"""
if self._container.procedural is None:
self._container.procedural = ProceduralMemory(
memory_id=f"procedural_{uuid.uuid4().hex[:12]}",
user_id=self.user_id,
data=ProceduralMemoryData(),
)
current_score: float = self._container.procedural.data.neuroticism_tendency.score
new_score: float = max(-1.0, min(1.0, current_score + adjustment))
self._container.procedural.data.neuroticism_tendency.score = new_score
self._container.procedural.data.neuroticism_tendency.derived_from.append(source)
self._save_to_storage()
return new_score
def get_neuroticism_tendency(self) -> float:
"""
获取神经质倾向分数
Returns:
神经质倾向分数 (-1.0 ~ 1.0)
"""
if self._container.procedural:
return self._container.procedural.data.neuroticism_tendency.score
return 0.0
def update_narrative_memory(self, narrative_data: dict[str, Any]) -> str:
"""
更新叙事记忆
Args:
narrative_data: 叙事记忆数据
Returns:
记忆ID
"""
if self._container.narrative is None:
memory_id: str = f"narrative_{uuid.uuid4().hex[:12]}"
self._container.narrative = NarrativeMemory(
memory_id=memory_id,
user_id=self.user_id,
data=NarrativeMemoryData.model_validate(narrative_data),
)
else:
current_data: NarrativeMemoryData = self._container.narrative.data
# 合并成长节点
if "growth_milestones" in narrative_data:
for milestone in narrative_data["growth_milestones"]:
current_data.growth_milestones.append(
GrowthMilestone(
timestamp=datetime.fromisoformat(milestone.get("timestamp", datetime.now().isoformat())),
event=milestone.get("event", ""),
significance=milestone.get("significance", ""),
importance_score=milestone.get("importance_score", 0.5),
)
)
# 更新当前身份
if "current_identity" in narrative_data:
for identity in narrative_data["current_identity"]:
if identity not in current_data.current_identity:
current_data.current_identity.append(identity)
self._save_to_storage()
return self._container.narrative.memory_id
def update_semantic_memory(self, semantic_data: dict[str, Any]) -> str:
"""
更新语义记忆
Args:
semantic_data: 语义记忆数据
Returns:
记忆ID
"""
if self._container.semantic is None:
memory_id: str = f"semantic_{uuid.uuid4().hex[:12]}"
self._container.semantic = SemanticMemory(
memory_id=memory_id,
user_id=self.user_id,
data=SemanticMemoryData.model_validate(semantic_data),
)
else:
current_data: SemanticMemoryData = self._container.semantic.data
# 合并核心概念
if "core_concepts" in semantic_data:
for concept in semantic_data["core_concepts"]:
existing: bool = False
for existing_concept in current_data.core_concepts:
if existing_concept.concept == concept.get("concept"):
existing_concept.usage_count += 1
existing = True
break
if not existing:
current_data.core_concepts.append(
ConceptDefinition(
concept=concept.get("concept", ""),
definition=concept.get("definition", ""),
attributes=concept.get("attributes", {}),
related_concepts=concept.get("related_concepts", []),
usage_count=1,
confidence=concept.get("confidence", 0.5),
)
)
self._save_to_storage()
return self._container.semantic.memory_id
def update_emotional_memory(self, emotional_data: dict[str, Any]) -> str:
"""
更新情感记忆
Args:
emotional_data: 情感记忆数据
Returns:
记忆ID
"""
if self._container.emotional is None:
memory_id: str = f"emotional_{uuid.uuid4().hex[:12]}"
self._container.emotional = EmotionalMemory(
memory_id=memory_id,
user_id=self.user_id,
data=EmotionalMemoryData.model_validate(emotional_data),
)
else:
current_data: EmotionalMemoryData = self._container.emotional.data
# 添加情绪状态
if "emotion_states" in emotional_data:
for state in emotional_data["emotion_states"]:
current_data.emotion_states.append(
EmotionState(
timestamp=datetime.fromisoformat(state.get("timestamp", datetime.now().isoformat())),
emotion_type=state.get("emotion_type", "neutral"),
intensity=state.get("intensity", 0.5),
trigger_context=state.get("trigger_context", ""),
topic=state.get("topic", ""),
decay_factor=state.get("decay_factor", 0.98),
)
)
self._save_to_storage()
return self._container.emotional.memory_id
def update_from_extractions(self, extractions: dict[str, Any]) -> dict[str, str]:
"""
从提炼结果更新记忆
Args:
extractions: 提炼结果
Returns:
更新的记忆ID映射
"""
result: dict[str, str] = {}
if extractions.get("user_profile"):
result["user_profile"] = self.update_user_profile(
extractions["user_profile"]
)
if extractions.get("procedural"):
result["procedural"] = self.update_procedural_memory(
extractions["procedural"]
)
if extractions.get("narrative"):
result["narrative"] = self.update_narrative_memory(
extractions["narrative"]
)
if extractions.get("semantic"):
result["semantic"] = self.update_semantic_memory(
extractions["semantic"]
)
if extractions.get("emotional"):
result["emotional"] = self.update_emotional_memory(
extractions["emotional"]
)
return result
def get_all_memories(self) -> LongTermMemoryContainer:
"""
获取所有长期记忆
Returns:
长期记忆容器
"""
return self._container
def apply_heat_policy(self) -> dict[str, Any]:
"""
应用冷热度策略
Returns:
应用结果统计
"""
result: dict[str, Any] = {
"total_processed": 0,
"migrations": [],
}
# 处理用户画像(通常保持热状态)
if self._container.user_profile:
self._container.user_profile.heat.heat_level = HeatLevel.HOT
self._container.user_profile.heat.heat_score = 100.0
result["total_processed"] += 1
# 处理程序性记忆
if self._container.procedural:
memory: ProceduralMemory = self._container.procedural
new_score: float = self._heat_manager.calculate_heat_score(
days_since_access=self._days_since(memory.heat.last_accessed_at),
access_count=memory.heat.access_count,
importance=0.8,
user_interaction=0.0,
)
new_level: HeatLevel = self._heat_manager.determine_level(new_score)
if new_level != memory.heat.heat_level:
result["migrations"].append(
{
"memory_id": memory.memory_id,
"from": memory.heat.heat_level.value,
"to": new_level.value,
}
)
memory.heat.heat_score = new_score
memory.heat.heat_level = new_level
result["total_processed"] += 1
# 处理叙事记忆
if self._container.narrative:
memory_n: NarrativeMemory = self._container.narrative
new_score_n: float = self._heat_manager.calculate_heat_score(
days_since_access=self._days_since(memory_n.heat.last_accessed_at),
access_count=memory_n.heat.access_count,
importance=0.6,
user_interaction=0.0,
)
new_level_n: HeatLevel = self._heat_manager.determine_level(new_score_n)
memory_n.heat.heat_score = new_score_n
memory_n.heat.heat_level = new_level_n
result["total_processed"] += 1
# 处理情感记忆
if self._container.emotional:
memory_e: EmotionalMemory = self._container.emotional
new_score_e: float = self._heat_manager.calculate_heat_score(
days_since_access=self._days_since(memory_e.heat.last_accessed_at),
access_count=memory_e.heat.access_count,
importance=0.5,
user_interaction=0.0,
)
new_level_e: HeatLevel = self._heat_manager.determine_level(new_score_e)
memory_e.heat.heat_score = new_score_e
memory_e.heat.heat_level = new_level_e
result["total_processed"] += 1
# 情感衰减
self._apply_emotion_decay()
self._save_to_storage()
return result
def _days_since(self, timestamp: datetime) -> float:
"""
计算距今天数(内部方法)
Args:
timestamp: 时间戳
Returns:
天数
"""
delta = datetime.now() - timestamp
return delta.total_seconds() / 86400
def _apply_emotion_decay(self) -> None:
"""
应用情感衰减(内部方法)
"""
if self._container.emotional is None:
return
now: datetime = datetime.now()
updated_states: list[EmotionState] = []
for state in self._container.emotional.data.emotion_states:
days_passed: float = (now - state.timestamp).total_seconds() / 86400
new_intensity: float = state.intensity * (state.decay_factor**days_passed)
# 保留强度大于 0.1 的情绪
if new_intensity > 0.1:
state.intensity = new_intensity
updated_states.append(state)
self._container.emotional.data.emotion_states = updated_states[
-50:
] # 保留最近 50 条
def get_memory_by_type(self, memory_type: MemoryType) -> Any | None:
"""
按类型获取记忆
Args:
memory_type: 记忆类型
Returns:
记忆数据
"""
mapping: dict[MemoryType, Any] = {
MemoryType.USER_PROFILE: self._container.user_profile,
MemoryType.PROCEDURAL: self._container.procedural,
MemoryType.NARRATIVE: self._container.narrative,
MemoryType.SEMANTIC: self._container.semantic,
MemoryType.EMOTIONAL: self._container.emotional,
}
return mapping.get(memory_type)
def clear_all_memories(self) -> None:
"""
清除所有记忆
警告:此操作不可逆
"""
self._container = LongTermMemoryContainer(user_id=self.user_id)
self._save_to_storage()
# ========================================================================
# 反思记忆管理(P0: 链式推理增强)
# ========================================================================
def store_reflection(
self,
item: ReflectionMemoryItem,
) -> str:
"""
存储反思记忆到长期记忆
反思记忆被持久化到 EXTENDED_REFLECTION 分类,
作为元学习训练数据使用。
Args:
item: 反思记忆项
Returns:
memory_id
"""
# 构建存储路径
reflection_dir: Path = self.storage_path / "reflections"
reflection_dir.mkdir(parents=True, exist_ok=True)
# 存储为独立文件
storage_file: Path = reflection_dir / f"{item.memory_id}.json"
with open(storage_file, "w", encoding="utf-8") as f:
json.dump(item.model_dump(mode="json"), f, ensure_ascii=False, indent=2)
# 更新索引
self._container.reflection_index.append({
"memory_id": item.memory_id,
"timestamp": item.timestamp.isoformat(),
"trigger_type": item.trigger_type.value,
"outcome": item.outcome.value,
"learning_value": item.learning_value.value,
"step_index": item.step_index,
"task_type": item.task_type,
})
self._save_to_storage()
return item.memory_id
def get_memories_by_category(
self,
category: str,
limit: int = 100,
) -> list[dict[str, Any]]:
"""
按分类获取记忆
Args:
category: 记忆分类(如 EXTENDED_REFLECTION)
limit: 最大返回数量
Returns:
记忆列表
"""
if category == MemoryCategory.EXTENDED_REFLECTION.value:
return self._get_reflection_memories(limit)
# 其他分类(可扩展)
return []
def _get_reflection_memories(
self,
limit: int = 100,
) -> list[dict[str, Any]]:
"""
获取反思记忆列表
Args:
limit: 最大返回数量
Returns:
反思记忆列表
"""
reflection_dir: Path = self.storage_path / "reflections"
if not reflection_dir.exists():
return []
# 读取所有反思文件
memories: list[dict[str, Any]] = []
for file_path in reflection_dir.glob("*.json"):
try:
with open(file_path, "r", encoding="utf-8") as f:
data: dict[str, Any] = json.load(f)
memories.append(data)
except (json.JSONDecodeError, IOError):
continue
# 按时间排序(最新的在前)
memories.sort(
key=lambda x: x.get("timestamp", ""),
reverse=True,
)
return memories[:limit]
def get_reflection_statistics(self) -> dict[str, Any]:
"""
获取反思记忆统计信息
Returns:
统计信息
"""
reflections: list[dict[str, Any]] = self._get_reflection_memories(limit=1000)
if not reflections:
return {
"total_count": 0,
"outcome_distribution": {},
"learning_value_distribution": {},
"trigger_type_distribution": {},
"task_type_distribution": {},
}
# 统计分布
outcome_dist: dict[str, int] = {}
learning_dist: dict[str, int] = {}
trigger_dist: dict[str, int] = {}
task_dist: dict[str, int] = {}
for r in reflections:
# 结果分布
outcome: str = r.get("outcome", "unknown")
outcome_dist[outcome] = outcome_dist.get(outcome, 0) + 1
# 学习价值分布
lv: str = r.get("learning_value", "low")
learning_dist[lv] = learning_dist.get(lv, 0) + 1
# 触发类型分布
tt: str = r.get("trigger_type", "unknown")
trigger_dist[tt] = trigger_dist.get(tt, 0) + 1
# 任务类型分布
task_type: str = r.get("task_type", "unknown")
task_dist[task_type] = task_dist.get(task_type, 0) + 1
return {
"total_count": len(reflections),
"outcome_distribution": outcome_dist,
"learning_value_distribution": learning_dist,
"trigger_type_distribution": trigger_dist,
"task_type_distribution": task_dist,
"high_value_count": learning_dist.get("high", 0),
"correction_rate": outcome_dist.get("corrected", 0) / len(reflections) if reflections else 0,
}
# ============================================================================
# 导出
# ============================================================================
__all__ = ["LongTermMemoryManager"]