文件预览

engine.py

查看 Secretary Skill 技能包中的文件内容。

文件内容

src/engine.py

"""
Secretary Skill Core Engine

The 3-Stage Pipeline:
  1. De-ambiguation — turn vague visions into concrete goals
  2. Vertical Splitting — break goals into milestones → atomic tasks
  3. Cron Automation — map tasks to a timeline and produce cron config
"""

from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional


# ── Data Models ──────────────────────────────────────────────────────


@dataclass
class Goal:
    """A concrete, measurable goal locked by the de-ambiguation engine."""
    raw_text: str
    refined_statement: str
    success_criteria: list[str]
    created_at: datetime = field(default_factory=datetime.now)


@dataclass
class Milestone:
    """A high-level milestone contributing to a Goal."""
    id: str
    title: str
    description: str
    deadline: Optional[str] = None
    tasks: list["AtomicTask"] = field(default_factory=list)


@dataclass
class AtomicTask:
    """A 2-hour executable atomic task."""
    id: str
    description: str
    duration_minutes: int = 120
    done: bool = False


# ── Stage 1: De-ambiguation ──────────────────────────────────────────


class DeambiguationEngine:
    """
    Conversational de-ambiguation engine.

    Asks ONE thoughtful question at a time, building on previous answers.
    The user signals when the goal feels clear enough — no fixed checklist.
    """

    # Question bank: each question targets one dimension of the goal.
    # Topics are ordered by logical flow: meaning → scope → measurement → timeline → stakeholders
    QUESTION_BANK = [
        { "topic": "meaning",
          "text": "这 {goal} 对你来说具体意味着什么?是某个数字、某种安全感、还是某件想做的事情?" },
        { "topic": "scope",
          "text": "你说的 {goal},大概是什么范围?是纯靠工资攒、还是包括投资收益或副业收入?" },
        { "topic": "measure",
          "text": "你怎么判断自己「做到了」?比如是账户余额达到那个数,还是某个场景下你希望自己能说一句「够了」?" },
        { "topic": "timeline",
          "text": "三年这个时间是怎么来的?是有一个具体的截止场景(买房、创业、跳槽),还是一个感觉上合适的时间?" },
        { "topic": "stakeholders",
          "text": "这个事情牵涉到其他人吗?比如家人、伴侣、合伙人——他们的预期和你的预期一致吗?" },
        { "topic": "risk",
          "text": "如果中途有意外(降薪、急用钱、市场波动),你能接受的底线是多少?或者说,什么情况下你会考虑调整计划?" },
    ]

    def __init__(self):
        self.answers: dict[str, str] = {}
        self.conversation: list[tuple[str, str, str]] = []  # (topic, question, answer)
        self.topics_covered: set[str] = set()
        self._current_index = 0

    def next_question(self) -> Optional[str]:
        """Return the next unanswered question, or None if all topics are covered."""
        while self._current_index < len(self.QUESTION_BANK):
            q = self.QUESTION_BANK[self._current_index]
            if q["topic"] not in self.topics_covered:
                # Fill in {goal} placeholder with user's raw text
                goal_placeholder = getattr(self, '_raw_text', '你的目标')
                return q["text"].format(goal=goal_placeholder)
            self._current_index += 1
        return None

    def record_answer(self, topic: str, question: str, answer: str):
        """Record an answer and mark the topic as covered."""
        self.answers[topic] = answer
        self.conversation.append((topic, question, answer))
        self.topics_covered.add(topic)
        self._current_index += 1

    def set_raw_text(self, raw_text: str):
        """Set the user's original goal text for use in follow-up questions."""
        self._raw_text = raw_text

    @property
    def topics_remaining(self) -> list[str]:
        """Return topics not yet discussed."""
        return [
            q["topic"]
            for q in self.QUESTION_BANK
            if q["topic"] not in self.topics_covered
        ]

    def is_goal_clear(self, user_says_clear: bool = False) -> bool:
        """
        The goal is "clear enough" when:
        1. The user explicitly says it's clear, AND
        2. At least the core 3 topics (meaning, scope, timeline) are covered.
        """
        core = {"meaning", "scope", "timeline"}
        return user_says_clear and core.issubset(self.topics_covered)

    def lock_goal(self, raw_text: str) -> Goal:
        """Synthesize everything discussed into a locked Goal."""
        # Build refined statement from conversation context
        parts = [raw_text.strip().rstrip('.')]
        if "timeline" in self.answers:
            parts.append(f"Timeline: {self.answers['timeline']}")
        if "meaning" in self.answers:
            parts.append(f"Purpose: {self.answers['meaning']}")
        refined = ". ".join(parts) + "."

        # Build success criteria from all covered topics
        criteria = []
        for topic in ["meaning", "measure", "timeline", "scope", "stakeholders", "risk"]:
            if topic in self.answers:
                criteria.append(f"[{topic}] {self.answers[topic]}")

        return Goal(
            raw_text=raw_text,
            refined_statement=refined,
            success_criteria=criteria,
        )


# ── Stage 2: Vertical Splitting ──────────────────────────────────────


class VerticalSplitter:
    """
    Takes a locked Goal and hierarchically breaks it down:
    Goal → Milestones → Atomic Tasks (2-hour executable units).
    """

    def __init__(self, goal: Goal, depth: int = 2):
        self.goal = goal
        self.depth = min(max(depth, 1), 3)  # clamp 1-3
        self.milestones: list[Milestone] = []

    def add_milestone(self, title: str, description: str,
                      deadline: Optional[str] = None) -> Milestone:
        ms = Milestone(
            id=f"ms-{len(self.milestones) + 1}",
            title=title,
            description=description,
            deadline=deadline,
        )
        self.milestones.append(ms)
        return ms

    def add_task(self, milestone_id: str, description: str,
                 duration: int = 120) -> Optional[AtomicTask]:
        for ms in self.milestones:
            if ms.id == milestone_id:
                task = AtomicTask(
                    id=f"{ms.id}-t{len(ms.tasks) + 1}",
                    description=description,
                    duration_minutes=duration,
                )
                ms.tasks.append(task)
                return task
        return None


# ── Stage 3: Cron Automation ─────────────────────────────────────────


@dataclass
class CronJob:
    """A scheduled job payload for the automation bridge."""
    skill_target: str = "Secretary Skill"
    cron_expression: str = ""
    action_type: str = "active_reminder"
    payload: dict = field(default_factory=dict)


class CronScheduler:
    """
    Maps tasks onto a timeline and formats JSON payloads
    ready for injection into an automation engine.
    """

    FREQ_MAP = {
        "daily": "0 9 * * 1-5",
        "weekly": "0 9 * * 1",
        "weekdays": "0 9 * * 1-5",
        "weekends": "0 9 * * 0,6",
    }

    def schedule(self, task: AtomicTask, freq: str = "daily",
                 goal_snapshot: str = "") -> CronJob:
        cron_expr = self.FREQ_MAP.get(freq, "0 9 * * 1-5")
        return CronJob(
            cron_expression=cron_expr,
            payload={
                "original_goal_snapshot": goal_snapshot,
                "current_task": task.description,
                "trigger_prompt": (
                    "Re-anchor the user to the grand vision, "
                    "verify if the current task is ready for today, "
                    "and listen for a `/align` call if scheduling "
                    "conflicts exist."
                ),
            },
        )

    def to_json(self, job: CronJob) -> dict:
        return {
            "skill_target": job.skill_target,
            "cron_expression": job.cron_expression,
            "action_type": job.action_type,
            "payload": job.payload,
        }