文件预览

bootstrap_context.py

查看 MemCore 记忆核心 技能包中的文件内容。

文件内容

scripts/memcore/bootstrap_context.py

"""
Bootstrap Context Generator: 生成 MEMORY_BRIEF.md 替代全量 MEMORY.md 注入。

策略:
- MEMORY.md(14KB≈3500 tokens)→ MEMORY_BRIEF.md(≤500 tokens)
- 启动时只注入精简简报,需要时通过三层检索按需获取
- MEMORY.md 保持完整不修改,随时可回退
"""

import json
import sqlite3
import subprocess
import sys
from datetime import date, timedelta
from pathlib import Path
from typing import Optional


def count_tokens(text: str) -> int:
    """粗略估算 token 数(中文 ~0.5 tokens/字,英文 ~1.3 tokens/词)"""
    import re
    chinese_chars = len(re.findall(r'[\u4e00-\u9fff]', text))
    english_words = len(re.findall(r'[a-zA-Z]+', text))
    return int(chinese_chars * 0.5 + english_words * 1.3 + len(text) * 0.1)  # 标点/换行


def generate_brief(
    trace_db: str = None,
    pattern_db: str = None,
    workspace_root: str = None,
    max_tokens: int = 500,
) -> str:
    """
    生成启动简报。
    返回的文本直接写入 MEMORY_BRIEF.md,被注入 session 上下文。
    """
    if workspace_root is None:
        workspace_root = Path.home() / ".openclaw" / "workspace"
    else:
        workspace_root = Path(workspace_root)

    if trace_db is None:
        trace_db = Path.home() / ".openclaw" / "trace_index.db"
    else:
        trace_db = Path(trace_db)

    if pattern_db is None:
        pattern_db = Path.home() / ".openclaw" / "pattern_index.db"
    else:
        pattern_db = Path(pattern_db)

    today = date.today()

    sections = []

    # ── §1 身份核心(固定,约 50 tokens) ──
    soul = workspace_root / "SOUL.md"
    identity = ""
    if soul.exists():
        content = soul.read_text(encoding="utf-8")
        # 只取第一段核心
        for line in content.split("\n"):
            if line.startswith("- **Name:**") or line.startswith("- **What to call"):
                identity += line.strip() + "\n"
        identity = identity.strip()

    if identity:
        sections.append(f"## 身份\n{identity}")

    # ── §2 活跃模式(L2 patterns,价值最高的 5 个) ──
    if Path(pattern_db).exists():
        patterns = _get_top_patterns(pattern_db, limit=5)
        if patterns:
            lines = ["## 活跃模式"]
            for p in patterns[:5]:
                name = p["name"].replace("[", "(").replace("]", ")")
                desc = (p.get("description") or "")[:80]
                freq = p.get("frequency", 0)
                lines.append(f"- {name}:{desc}({freq}次)")
            sections.append("\n".join(lines))

    # ── §3 近期高价值教训(L1 traces,最近 7 天 top 3) ──
    if Path(trace_db).exists():
        traces = _get_recent_high_value(trace_db, days=7, limit=3)
        if traces:
            lines = ["## 近期教训"]
            for t in traces:
                date_str = t.get("date", "")
                reflection = (t.get("reflection") or "")[:100]
                if reflection:
                    lines.append(f"- [{date_str}] {reflection}")
            if len(lines) > 1:
                sections.append("\n".join(lines))

    # ── §4 任务板 ──
    taskboard = workspace_root / "memory" / "taskboard.md"
    if taskboard.exists():
        tasks = taskboard.read_text(encoding="utf-8")
        # 只取未完成项
        pending = [l for l in tasks.split("\n") if l.strip().startswith("- [ ]")]
        if pending:
            lines = ["## 待办"] + pending[:5]
            sections.append("\n".join(lines))

    # ── §5 系统健康(最近一次反馈统计) ──
    feedback_db = Path.home() / ".openclaw" / "feedback.db"
    if feedback_db.exists():
        stats = _get_feedback_summary(feedback_db)
        if stats:
            sections.append(stats)

    # ── 组装 + token 裁剪 ──
    brief = (
        f"# MEMORY BRIEF {today.isoformat()}\n"
        f"(MemCore 自动生成,完整记忆见 MEMORY.md 或 mos search)\n\n"
        + "\n\n".join(sections)
    )

    # Token 预算控制:如果超了,逐步裁剪
    while count_tokens(brief) > max_tokens:
        # 裁剪模式描述长度
        brief = _trim_brief(brief, max_tokens)

    return brief


def _get_top_patterns(pattern_db: str, limit: int = 5) -> list[dict]:
    """获取最活跃的 patterns"""
    try:
        with sqlite3.connect(pattern_db) as conn:
            conn.row_factory = sqlite3.Row
            rows = conn.execute(
                "SELECT * FROM patterns WHERE confidence >= 0.7 ORDER BY frequency DESC LIMIT ?",
                (limit,)
            ).fetchall()
            return [dict(r) for r in rows]
    except Exception:
        return []


def _get_recent_high_value(trace_db: str, days: int = 7, limit: int = 3) -> list[dict]:
    """获取近期高价值 traces"""
    cutoff = (date.today() - timedelta(days=days)).isoformat()
    try:
        with sqlite3.connect(trace_db) as conn:
            conn.row_factory = sqlite3.Row
            rows = conn.execute(
                """SELECT * FROM traces 
                   WHERE date >= ? AND value_score >= 0.5 AND reflection != '' 
                   ORDER BY value_score DESC LIMIT ?""",
                (cutoff, limit)
            ).fetchall()
            return [dict(r) for r in rows]
    except Exception:
        return []


def _get_feedback_summary(feedback_db: str) -> Optional[str]:
    """反馈系统简要统计"""
    try:
        with sqlite3.connect(feedback_db) as conn:
            total = conn.execute("SELECT COUNT(*) FROM feedback_events").fetchone()[0]
            refs = conn.execute("SELECT COUNT(*) FROM reference_log").fetchone()[0]
            used = conn.execute(
                "SELECT COUNT(*) FROM reference_log WHERE was_used = 1"
            ).fetchone()[0]

        if total > 0 or refs > 0:
            use_rate = used / refs if refs > 0 else 0
            return f"## 系统健康\n反馈: {total} | 引用: {refs} | 命中率: {use_rate:.0%}"
    except Exception:
        pass
    return None


def _trim_brief(brief: str, max_tokens: int) -> str:
    """Token 超限时裁剪"""
    lines = brief.split("\n")
    result = []
    for line in lines:
        result.append(line)
        if count_tokens("\n".join(result)) > max_tokens:
            result.pop()
            break

    # 确保有关闭提示
    trimmed = "\n".join(result)
    if len(trimmed) < len(brief):
        trimmed += "\n\n<!-- 已裁剪,完整内容用 mos search 检索 -->"
    return trimmed


# ── CLI 入口 ──

def main():
    brief = generate_brief()
    brief_path = Path.home() / ".openclaw" / "workspace" / "MEMORY_BRIEF.md"

    # 写入文件
    brief_path.write_text(brief, encoding="utf-8")

    token_count = count_tokens(brief)

    print(brief)
    print(f"\n{'='*50}")
    print(f"📄 已写入: {brief_path}")
    print(f"📊 Token 估算: ~{token_count} (原 MEMORY.md ~3500, 节省 {int((1 - token_count/3500) * 100)}%)")


if __name__ == "__main__":
    main()