文件预览

pipeline.py

查看 lobster-novel 技能包中的文件内容。

文件内容

core/pipeline.py

#!/usr/bin/env python3
"""
lobster-novel: writing pipeline coordinator
"""
import json, re, sys
from pathlib import Path
from datetime import datetime
from typing import Optional, List, Dict

sys.path.insert(0, str(Path(__file__).parent))
from bible import BibleManager, Character, ChapterSpec, Arc
from continuity import ContinuityTracker, ChapterState


class Pipeline:
    """Orchestrates the novel writing pipeline.

    This is a stateless coordinator that:
    1. Reads project state (bible + continuity)
    2. Prepares writing context for the next chapter
    3. Validates generated output
    4. Updates state after writing
    """

    def __init__(self, project_dir: Path):
        self.dir = Path(project_dir)
        self.bible = BibleManager(self.dir)
        self.continuity = ContinuityTracker(self.dir)
        self._item_verifier = None

    @property
    def item_verifier(self):
        """Lazy-init SceneItemVerifier.
        Calls LLM to verify scene item plausibility before writing."""
        if self._item_verifier is None:
            from scene_items import SceneItemVerifier
            self._item_verifier = SceneItemVerifier(self.dir)
        return self._item_verifier

    def verify_scene_items(self, chapter_num: int, chapter_plan: dict):
        """Verify scene item plausibility before writing chapter.
        Returns SceneItemChecklist with verified items + rejected list."""
        return self.item_verifier.verify_chapter(chapter_num, chapter_plan)

    # ── Context Preparation ───────────────────────────────────

    def get_writing_context(self, chapter: Optional[int] = None) -> dict:
        """Prepare the context needed to write the next chapter"""
        b = self.bible.bible
        ch_n = chapter or (b.current_chapter + 1)

        # Bible summary
        bible_summary = (
            f"## {b.title}\n"
            f"Genre: {b.genre}/{b.subgenre}\n"
            f"Tone: {b.tone}  POV: {b.pov}\n"
            f"Theme: {b.theme}\n"
        )

        # Characters overview
        char_lines = []
        for c in b.characters.values():
            char_lines.append(
                f"- {c.name} ({c.role}): {c.current_state or 'alive'} | {c.background[:80]}")
        char_summary = "\n".join(char_lines) if char_lines else "none yet"

        # Plot context
        arcs = b.arcs
        plot_summary = (
            f"Current arc: {b.current_arc}\n"
            f"Total arcs: {len(arcs)}\n"
            f"Chapters written: {b.current_chapter}\n"
        )

        # Continuity context
        cont_summary = self.continuity.get_summary_for(max(ch_n - 1, 0))
        hook_status = self.continuity.get_hook_status()
        overdue_hooks = self.bible.check_hooks()

        # Chapter spec (if exists)
        spec = b.chapters.get(ch_n)

        return {
            "chapter": ch_n,
            "bible_summary": bible_summary,
            "characters": char_summary,
            "plot": plot_summary,
            "continuity": cont_summary or "first chapter",
            "hooks": hook_status,
            "overdue_hooks": overdue_hooks,
            "spec": {
                "title": spec.title if spec else "",
                "summary": spec.summary if spec else "",
                "scenes": spec.scene_beats if spec else [],
                "pov": spec.pov if spec else "",
                "emotional_turn": spec.emotional_turn if spec else "",
            } if spec else {},
        }

    def format_writing_prompt(self, ctx: dict) -> str:
        """Format the context into a writing prompt for the agent"""
        spec = ctx.get("spec", {})
        lines = [
            f"# Chapter {ctx['chapter']} Writing Brief",
            "",
            f"## Novel",
            ctx.get("bible_summary", ""),
            "",
            f"## Characters",
            ctx.get("characters", ""),
            "",
            f"## Plot Context",
            ctx.get("plot", ""),
            "",
            f"## Continuity (last chapters)",
            ctx.get("continuity", "first chapter"),
            "",
        ]
        if ctx.get("hooks"):
            lines += ["## Active Hooks", ctx["hooks"], ""]
        if ctx.get("overdue_hooks"):
            lines += ["## Overdue Hooks (resolve these!)"]
            lines += [f"- {h}" for h in ctx["overdue_hooks"]]
            lines.append("")
        if spec.get("title"):
            lines.append(f"## Chapter Spec")
            if spec.get("title"):
                lines.append(f"Title: {spec['title']}")
            if spec.get("summary"):
                lines.append(f"Summary: {spec['summary']}")
            if spec.get("scenes"):
                lines.append(f"Scenes:")
                lines.extend(f"- {s}" for s in spec["scenes"])
            if spec.get("pov"):
                lines.append(f"POV: {spec['pov']}")
            lines.append("")

        lines += [
            "## Requirements",
            "- Write ~2000-3000 words",
            "- Hook at chapter end",
            "- Advance at least one character arc",
            "- Pay off any overdue hooks if possible",
            "- Keep prose consistent with tone",
            "",
            "## Output:",
            "---",
            "# Chapter X: [Title]",
            "",
            "(chapter text here)",
            "---",
        ]
        return "\n".join(lines)

    # ── Post-Write Processing ─────────────────────────────────

    def process_chapter_output(self, chapter: int, text: str) -> dict:
        """Process generated chapter: validate + update state"""
        word_count = len(text.replace("\n", "").replace(" ", ""))
        # rough word count for Chinese: use character count
        char_count = len([c for c in text if '\u4e00' <= c <= '\u9fff'])

        result = {
            "chapter": chapter,
            "char_count": char_count,
            "total_chars": len(text),
            "word_count_est": word_count,
            "has_title": bool(re.search(r'^#\s+第.*章', text, re.M)),
            "has_hook": self._detect_hook(text),
            "continuity_risks": [],
        }
        return result

    def _detect_hook(self, text: str) -> bool:
        """Heuristic: check if last paragraph feels like a hook"""
        paras = [p for p in text.split("\n\n") if p.strip()]
        if not paras:
            return False
        last = paras[-1].strip()
        hook_indicators = ["?", "!", "突然", "竟然", "究竟", "谁", "什么", "...", "——"]
        return any(indicator in last for indicator in hook_indicators)

    def save_chapter(self, chapter: int, text: str, summary: str,
                     char_changes: dict, new_hooks: list, resolved: list):
        """Save chapter and update continuity"""
        output_dir = self.dir / "chapters"
        output_dir.mkdir(parents=True, exist_ok=True)

        # save chapter text
        out_file = output_dir / f"ch{chapter:03d}.md"
        out_file.write_text(text, encoding="utf-8")

        # update bible
        spec = self.bible.bible.chapters.get(chapter)
        if not spec:
            spec = ChapterSpec(number=chapter)
        self.bible.add_chapter(spec)

        # plant/payoff hooks
        for h_desc in new_hooks:
            self.bible.plant_hook(chapter, h_desc, chapter + 5)
        for h_desc in resolved:
            self.bible.payoff_hook(chapter, h_desc)

        # update characters
        for name, new_state in char_changes.items():
            self.bible.update_character_state(name, new_state)

        # continuity record
        state = ChapterState(
            chapter=chapter,
            summary=summary,
            changed_characters=char_changes,
            new_hooks=new_hooks,
            resolved_hooks=resolved,
        )
        self.continuity.append(state)

        return {
            "chapter": chapter,
            "file": str(out_file),
            "word_count": len(text),
        }

    # ── Review Prep ───────────────────────────────────────────

    def get_review_context(self, chapter: int) -> dict:
        """Prepare context for reviewing a chapter"""
        ch_file = self.dir / "chapters" / f"ch{chapter:03d}.md"
        if not ch_file.exists():
            return {"error": f"chapter {chapter} not found"}

        text = ch_file.read_text(encoding="utf-8")
        ctx = self.get_writing_context(chapter)
        return {
            "chapter": chapter,
            "text": text[:8000],
            "bible_summary": ctx.get("bible_summary", ""),
            "characters": ctx.get("characters", ""),
            "hooks": ctx.get("hooks", ""),
        }

    # ── 6-Stage Workflow ─────────────────────────────────────
    # Inspired by novel-writer-structure:
    #   Write → Self-check → Review → Score → Conflict Check → Polish → Revise

    def run_self_check(self, chapter: int, text: str) -> list:
        """Stage 1: Self-check — author's own quick review (static checks)."""
        from quality_check import QualityChecker
        report = QualityChecker.check_text(text, chapter)
        return report.issues

    def run_review_all(self, chapter: int, text: str) -> dict:
        """Stage 2: Full review — static + scorer + conflict checks."""
        from quality_check import QualityChecker
        from scorer import SixDimScorer
        from conflict_detector import ConflictDetector

        issues = []

        # Static quality
        report = QualityChecker.check_text(text, chapter)
        issues.extend(report.issues)

        # No-LLM check
        detector = ConflictDetector(self.dir)
        conflicts = detector.check_all(chapter, text)
        for c in conflicts:
            issues.append({
                "severity": c.severity,
                "role": "Storyteller",
                "category": c.category,
                "description": c.description,
                "suggestion": c.suggestion,
            })

        return {
            "issues": issues,
            "static_score": report.scores.get("static", 0),
            "conflicts": len(conflicts),
        }

    def run_scoring(self, text: str, bible_context: str = "") -> dict:
        """Stage 3: 6-dimension scoring (novel-evaluator-inspired)."""
        from scorer import SixDimScorer
        result = SixDimScorer.score_all(text, bible_context)
        return {
            "total": result.total,
            "avg": result.avg,
            "dimensions": {k: v.score for k, v in result.dimensions.items()},
        }

    def polish_text(self, text: str, profile: Optional[str] = None) -> str:
        """Stage 4: Polish — basic text-level improvements without LLM.
        Fixes: extra whitespace, inconsistent line breaks, dialog formatting.
        """
        # Strip trailing whitespace per line
        lines = [l.rstrip() for l in text.split("\n")]
        # Normalize multiple blank lines to at most one
        result = []
        blank_count = 0
        for l in lines:
            if not l.strip():
                blank_count += 1
                if blank_count <= 1:
                    result.append("")
            else:
                blank_count = 0
                result.append(l)
        # Fix dialog spacing: remove spaces between quote and text
        cleaned = "\n".join(result)
        cleaned = re.sub(r'"\s+', '"', cleaned)
        cleaned = re.sub(r'\s+"', '"', cleaned)
        cleaned = re.sub(r'[\u201c]\s+', '\u201c', cleaned)
        cleaned = re.sub(r'\s+[\u201d]', '\u201d', cleaned)
        return cleaned.strip()

    def revise_from_issues(self, text: str, issues: list) -> str:
        """Stage 5: Revise — apply simple P0 fixes automatically.
        This is a static revision (not LLM). For complex revisions,
        the user can invoke LLM-assisted revision.
        """
        # Only handle specific P0 patterns automatically
        p0_fixes = {
            r'http[s]?://': '(link removed)',
        }
        for pat, replacement in p0_fixes.items():
            text = re.sub(pat, replacement, text)
        return text

    def full_quality_gate(self, chapter: int, text: str,
                          bible_context: str = "") -> dict:
        """Run all 6 stages and return combined report.
        Stages: Self-check → Review → Score → Conflict Check → Polish → Revise
        """
        issues = self.run_self_check(chapter, text)
        review = self.run_review_all(chapter, text)
        scoring = self.run_scoring(text, bible_context)
        polished = self.polish_text(text)
        revised = self.revise_from_issues(polished, review.get("issues", []))

        p0_count = sum(1 for i in review.get("issues", []) if i.get("severity") == "P0")
        p1_count = sum(1 for i in review.get("issues", []) if i.get("severity") == "P1")

        return {
            "chapter": chapter,
            "original_len": len(text),
            "polished_len": len(polished),
            "revised_len": len(revised),
            "p0_count": p0_count,
            "p1_count": p1_count,
            "static_score": review.get("static_score", 0),
            "scoring": scoring,
            "conflicts": review.get("conflicts", 0),
            "issues": review.get("issues", [])[:10],
            "passed": p0_count == 0,
        }

    # ── Multi-Agent Pipeline ────────────────────────────────────
    # Inspired by 马良写作 7-Agent collaboration:
    #   Planner → Writer → Expander → Reviewer → Reviser → Polish → Audit

    def multi_agent_write(self, chapter_num: int, api_key: str = "",
                          template_name: str = "") -> dict:
        """Multi-stage writing pipeline with internal iteration.
        Stages: Plan → Write → Expand → Review → Revise → Polish
        """
        from chapters import ChapterGenerator
        gen = ChapterGenerator(api_key=api_key)
        if not gen.api_key:
            return {"error": "SENSENOVA_API_KEY not set"}

        from chapters import count_tokens, token_report

        ctx = self.get_writing_context(chapter_num)
        bible_context = ctx.get("bible_summary", "")
        result = {"chapter": chapter_num, "stages": [], "total_tokens": 0}

        # Stage 1: Planner — generate chapter outline/intent
        plan = self._agent_plan(gen, chapter_num, ctx, template_name)
        result["stages"].append({"stage": "plan", "output": plan[:100]})

        # Stage 2: Writer — write full chapter from plan
        text = self._agent_write(gen, chapter_num, ctx, plan)
        result["stages"].append({
            "stage": "write", "chars": len(text),
            "tokens": gen.last_tokens.get("total_tokens", 0),
        })
        result["total_tokens"] += gen.last_tokens.get("total_tokens", 0)

        # Stage 3: Quality check + auto-revise loop (max 2 iterations)
        for attempt in range(2):
            review_result = self.run_review_all(chapter_num, text)
            p0_issues = [i for i in review_result.get("issues", [])
                         if i.get("severity") == "P0"]
            if not p0_issues:
                break
            # Revise via LLM
            text = self._agent_revise(gen, chapter_num, text, p0_issues[:3])
            result["stages"].append({
                "stage": f"revise-{attempt + 1}",
                "p0_fixed": len(p0_issues),
                "tokens": gen.last_tokens.get("total_tokens", 0),
            })
            result["total_tokens"] += gen.last_tokens.get("total_tokens", 0)

        # Stage 4: Polish (static, no API)
        from chinese_typeset import ChineseTypeset
        polished = ChineseTypeset.clean_all(text)
        result["stages"].append({"stage": "polish", "chars": len(polished)})

        result["final_text"] = polished
        return result

    def _agent_plan(self, gen, chapter_num: int, ctx: dict,
                     template_name: str = "") -> str:
        """Stage 1: Planner — generate chapter intent/outline."""
        brief = ctx.get("bible_summary", "")[:500]
        chars = ctx.get("characters", "")[:300]
        hooks = ctx.get("hooks", "")

        template_hint = ""
        if template_name:
            from beat_sheet import get_template
            bs = get_template(template_name)
            if bs:
                beat = bs.get_beat_for_chapter(chapter_num)
                if beat:
                    template_hint = f"\nBeat: {beat.name}\nFunction: {beat.function}\nTone: {beat.emotional_tone}"

        system = "你是一名小说策划师。根据上下文生成章节规划(500字内):核心冲突、场景顺序、角色动态、章节末钩子。"
        prompt = (
            f"# Chapter {chapter_num} Planning\n\n"
            f"## Novel Context\n{brief}\n\n"
            f"## Characters\n{chars}\n\n"
            f"## Hooks\n{hooks[:200]}\n"
            f"{template_hint}\n\n"
            "Output format:\n"
            "Core Conflict:(一句话)\n"
            "Scenes:(场景列表)\n"
            "Character Arcs:(角色变化)\n"
            "End Hook:(章节末钩子)"
        )
        return gen._call_api([
            {"role": "system", "content": system},
            {"role": "user", "content": prompt},
        ], temp=0.5, max_tokens=1024)

    def _agent_write(self, gen, chapter_num: int, ctx: dict,
                      plan: str) -> str:
        """Stage 2: Writer — generate full chapter from plan."""
        bible = ctx.get("bible_summary", "")[:800]
        chars = ctx.get("characters", "")[:400]
        cont = ctx.get("continuity", "")[:400]

        system = "你是一名优秀的中文网文作者。根据章节规划和圣经,创作完整的章节内容。"
        prompt = (
            f"# Chapter {chapter_num}\n\n"
            f"## Bible\n{bible}\n\n"
            f"## Characters\n{chars}\n\n"
            f"## Continuity\n{cont}\n\n"
            f"## Plan\n{plan}\n\n"
            "Requirements:\n"
            "- 2000-3000字\n"
            "- 章节末留钩子\n"
            "- 推进至少一个角色弧\n"
            "- 自然过渡\n\n"
            "Output:\n"
            f"# Chapter {chapter_num}: [Title]\n\n(text)"
        )
        return gen._call_api([
            {"role": "system", "content": system},
            {"role": "user", "content": prompt},
        ], temp=0.7, max_tokens=4096)

    def _agent_revise(self, gen, chapter_num: int, text: str,
                       issues: list) -> str:
        """Stage 3+: Reviser — fix specific issues via LLM."""
        if not issues:
            return text
        issue_text = "\n".join(
            f"- [{i.get('severity','P1')}] {i.get('description','')[:80]}"
            for i in issues[:3])

        system = "你是一名小说修订编辑。针对提出的问题修改章节内容。保持原文风格和情节,只改问题区域。"
        prompt = (
            f"## Issues to Fix\n{issue_text}\n\n"
            f"## Chapter Text\n{text[:5000]}\n\n"
            "输出修改后的完整章节文本:"
        )
        return gen._call_api([
            {"role": "system", "content": system},
            {"role": "user", "content": prompt},
        ], temp=0.3, max_tokens=4096)

    def quality_summary(self, chapter: int, text: str,
                        bible_context: str = "") -> str:
        """Human-readable quality gate summary."""
        result = self.full_quality_gate(chapter, text, bible_context)
        lines = [
            f"# Chapter {chapter} Quality Gate",
            f"",
            f"## Static Score: {result['static_score']}/100",
            f"## 6-Dim Score: {result['scoring']['avg']}/100",
            f"   Total: {result['scoring']['total']}/600",
            f"",
            f"## Issues",
            f"   P0 (must fix): {result['p0_count']}",
            f"   P1 (recommend): {result['p1_count']}",
            f"   Conflicts: {result['conflicts']}",
            f"",
            f"## Polish",
            f"   Original: {result['original_len']} chars",
            f"   Polished: {result['polished_len']} chars",
            f"   Revised:  {result['revised_len']} chars",
            f"",
        ]
        if result['passed']:
            lines.append("## Result: PASS ✅")
        else:
            lines.append(f"## Result: FAIL ({result['p0_count']} P0 issues) ❌")
            for i in result["issues"][:5]:
                if i.get("severity") == "P0":
                    lines.append(f"  - {i.get('description','')[:60]}")
        return "\n".join(lines)


if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description="lobster-novel pipeline")
    parser.add_argument("--dir", default="./my-novel", help="project dir")
    parser.add_argument("--context", type=int, help="get writing context for chapter")
    parser.add_argument("--prompt", type=int, help="get writing prompt for chapter")
    args = parser.parse_args()

    pipe = Pipeline(Path(args.dir))

    if args.context:
        ctx = pipe.get_writing_context(args.context)
        print(json.dumps(ctx, ensure_ascii=False, indent=2))
    elif args.prompt:
        ctx = pipe.get_writing_context(args.prompt)
        print(pipe.format_writing_prompt(ctx))