文件预览

migrate_3json.py

查看 lobster-novel 技能包中的文件内容。

文件内容

scripts/migrate_3json.py

#!/usr/bin/env python3
"""
migrate_3json.py — V5–V10 小说项目 3JSON → story-state.json 一键迁移

读取 project_dir 下的 chapter_appearances.json / character_roster.json / hooks.json,
自动检测版本格式,生成统一的 story-state.json。

Usage:
    python3 migrate_3json.py <project_dir>
    python3 migrate_3json.py /path/to/novels/V10_灰港镇的异客
    python3 migrate_3json.py --all          # 迁移 novels/ 下所有项目
    python3 migrate_3json.py --check         # 只检查不写入
"""

from __future__ import annotations

import json
import os
import re
import sys
from pathlib import Path
from typing import Any, Optional

# ── 确保能找到 core/ ──────────────────────────────────────────
_base = Path(__file__).resolve().parent.parent
for p in [str(_base), str(_base / "core")]:
    if p not in sys.path:
        sys.path.insert(0, p)

from core.story_state import StoryState, CharacterState, HookState, ChapterRecord, StrandState


# ═══════════════════════════════════════════════════════════════
#  Chapter number parsing helpers
# ═══════════════════════════════════════════════════════════════

CH_RE = re.compile(r"ch(\d+)", re.IGNORECASE)


def _parse_chapter_num(raw: Any) -> int:
    """Parse chapter number from various formats:
    - int: 1, 20
    - str: "Ch001", "v10ch001", "ch1"
    - str: "V5Ch001"
    """
    if isinstance(raw, int):
        return raw
    if isinstance(raw, str):
        m = CH_RE.search(raw)
        if m:
            return int(m.group(1))
        # Try bare number
        try:
            return int(raw)
        except ValueError:
            pass
    return 0


def _clean_title(raw: str) -> str:
    """Remove volume/chapter prefix from title like '[V5Ch001] 标题' → '标题'."""
    return re.sub(r"^\[?[Vv]\d+[Cc][Hh]\d+\]?\s*", "", raw).strip()


def _name_to_id(name: str) -> str:
    """Convert character name to a stable ID."""
    safe = name.strip().replace("·", "_").replace(" ", "_")
    return f"char_{safe}"


# ═══════════════════════════════════════════════════════════════
#  Format detectors & loaders
# ═══════════════════════════════════════════════════════════════


def _detect_format(data: Any, filename: str) -> str:
    """Detect which format the data uses."""
    if filename == "chapter_appearances.json":
        if isinstance(data, list):
            return "list_v5"  # V5 style: [{chapter, characters, key_events}]
        if isinstance(data, dict):
            # V10: {volume_key: {Ch001: {title, characters: {...}}}}
            for k, v in data.items():
                if isinstance(v, dict) and any(c.startswith("Ch") for c in v):
                    return "dict_v10"
                if isinstance(v, list):
                    # Could be V9 style: {entries: [...], v9ch031: [...]}
                    pass  # falls through to dict_unknown
            # V9: {entries: [{chapter, title, characters, key_events}], ...}
            if "entries" in data:
                return "dict_v9"
            # V8 style: {characters: [...]}
            if "characters" in data and isinstance(data.get("characters"), list):
                return "list_v8"
            return "dict_unknown"

    if filename == "character_roster.json":
        if isinstance(data, list):
            return "list"
        if isinstance(data, dict):
            if "characters" in data:
                return "dict_v10"  # V10: {characters: {...}, carry_over: {...}}
            return "dict_v5"  # V5/V9: {char_name: {...}}
        return "unknown"

    if filename == "hooks.json":
        if isinstance(data, list):
            return "list"  # Both V5+V10 style: list
        if isinstance(data, dict):
            return "dict"
        return "unknown"

    return "unknown"


# ═══════════════════════════════════════════════════════════════
#  Normalizers: load raw data → normalized internal structures
# ═══════════════════════════════════════════════════════════════


def _normalize_characters(raw: dict | list, fmt: str) -> dict[str, dict[str, Any]]:
    """Normalize character_roster → {id: {name, role, first_appearance, last_appearance, status, state, key_items}}."""
    result: dict[str, dict[str, Any]] = {}

    if fmt == "dict_v10":
        # V10: {characters: {name: {...}}, ...}
        chars = raw.get("characters", {})
    elif fmt in ("dict_v5", "dict_v9"):
        chars = raw
    elif fmt == "list":
        chars = {}
        for entry in raw:
            name = entry.get("name", entry.get("id", ""))
            chars[name] = entry
    else:
        chars = raw if isinstance(raw, dict) else {}

    for name, info in chars.items():
        if not isinstance(info, dict):
            continue
        cid = _name_to_id(name)
        role = info.get("role", "配角")
        raw_first = info.get("first_appearance", 0)
        raw_last = info.get("last_appearance", 0)
        first = _parse_chapter_num(raw_first) if raw_first else 0
        last = _parse_chapter_num(raw_last) if raw_last else first

        state = info.get("state", info.get("description", ""))
        if isinstance(state, dict):
            state = str(state)
        key_items = info.get("key_items", [])
        if not isinstance(key_items, list):
            key_items = []

        result[cid] = {
            "id": cid,
            "name": name,
            "role": {"主角": "主角", "双主角": "主角", "主角同伴": "核心角色",
                     "配角": "配角", "异客": "异客",
                     "protagonist": "主角", "mentor": "核心角色",
                     "supporting": "配角"}.get(role, "配角"),
            "first_appearance": max(first, 0),
            "last_appearance": max(last, 0),
            "status": info.get("status", "active"),
            "state": state[:200] if state else "",
            "key_items": key_items[:10],
        }

    return result


def _normalize_chapters(raw: dict | list, fmt: str) -> dict[int, dict[str, Any]]:
    """Normalize chapter_appearances → {num: {number, title, word_count, scene, characters_present, key_events}}."""
    result: dict[int, dict[str, Any]] = {}
    entries: list[dict] = []

    if fmt == "list_v5":
        raw_entries = raw if isinstance(raw, list) else []
        for e in raw_entries:
            num = e.get("chapter", 0)
            if isinstance(num, str):
                num = _parse_chapter_num(num)
            if num <= 0:
                continue
            chars = e.get("characters", [])
            if isinstance(chars, dict):
                chars = list(chars.keys())
            entries.append({
                "number": num,
                "title": _clean_title(e.get("title", "")),
                "word_count": e.get("word_count", 0),
                "scene": e.get("scene", e.get("locations", [None])[0] if isinstance(e.get("locations"), list) else ""),
                "characters_present": chars,
                "key_events": e.get("key_events", []),
            })
    elif fmt == "list_v8":
        # list_v8: {characters: [...], other keys} — 不含章节级数据,跳过
        return result
    elif fmt == "dict_v9":
        raw_entries = raw.get("entries", []) if isinstance(raw, dict) else []
        for e in raw_entries:
            num = _parse_chapter_num(e.get("chapter", ""))
            if num <= 0:
                continue
            chars = e.get("characters", [])
            if isinstance(chars, dict):
                chars = list(chars.keys())
            entries.append({
                "number": num,
                "title": _clean_title(e.get("title", "")),
                "word_count": e.get("word_count", 0),
                "scene": e.get("scene", ""),
                "characters_present": chars,
                "key_events": e.get("key_events", []),
            })
    elif fmt == "dict_v10":
        # {volume_key: {Ch001: {...}, ...}}
        for volume_key, chapters_dict in raw.items():
            if not isinstance(chapters_dict, dict):
                continue
            for ch_key, ch_data in chapters_dict.items():
                num = _parse_chapter_num(ch_key)
                if num <= 0:
                    continue
                title = _clean_title(ch_data.get("title", ch_data.get("chapter_title", "")))
                chars_raw = ch_data.get("characters", [])
                if isinstance(chars_raw, dict):
                    chars = list(chars_raw.keys())
                elif isinstance(chars_raw, list):
                    chars = chars_raw
                else:
                    chars = []
                events = ch_data.get("key_events", [])
                scene = ch_data.get("scene", "")
                if isinstance(scene, dict):
                    # Per-character scenes — take first
                    scenes = [s for s in scene.values() if isinstance(s, str)]
                    scene = scenes[0] if scenes else ""
                entries.append({
                    "number": num,
                    "title": title,
                    "word_count": ch_data.get("word_count", 0),
                    "scene": scene,
                    "characters_present": chars,
                    "key_events": events if isinstance(events, list) else [],
                })
    elif fmt == "dict_unknown":
        if isinstance(raw, dict):
            for k, v in raw.items():
                if isinstance(v, dict) and "characters" in v:
                    entries.append(_normalize_entry_guess(k, v))
                elif isinstance(v, list):
                    # {ch031: [char_names]}
                    num = _parse_chapter_num(k)
                    if num > 0:
                        entries.append({
                            "number": num, "title": "", "word_count": 0,
                            "scene": "", "characters_present": v, "key_events": [],
                        })

    for entry in entries:
        num = entry.get("number", 0)
        if num <= 0:
            continue
        result[num] = {
            "number": num,
            "title": entry.get("title", ""),
            "word_count": entry.get("word_count", 0),
            "scene": entry.get("scene", ""),
            "characters_present": entry.get("characters_present", []),
            "key_events": entry.get("key_events", []),
        }

    return result


def _normalize_entry_guess(key: str, data: dict) -> dict:
    """Guess chapter entry structure from unknown dict format."""
    num = _parse_chapter_num(key)
    chars = data.get("characters", [])
    if isinstance(chars, dict):
        chars = list(chars.keys())
    return {
        "number": num,
        "title": _clean_title(data.get("title", data.get("chapter_title", ""))),
        "word_count": data.get("word_count", 0),
        "scene": data.get("scene", ""),
        "characters_present": chars,
        "key_events": data.get("key_events", []),
    }


def _normalize_hooks(raw: list | dict, fmt: str) -> list[dict[str, Any]]:
    """Normalize hooks → [{id, description, type, chapter, status, expected_payoff}]."""
    result: list[dict[str, Any]] = []
    entries: list[dict] = []

    if fmt == "list":
        entries = raw if isinstance(raw, list) else []
    elif fmt == "dict":
        # Try nested hooks key first (卷六 format: {hooks: [...], metadata})  
        if isinstance(raw, dict) and "hooks" in raw and isinstance(raw["hooks"], list):
            entries = raw["hooks"]
        else:
            entries = [v for v in raw.values() if isinstance(v, dict)]

    for h in entries:
        if not isinstance(h, dict):
            continue

        # Detect V9 format: {hook, desc, planted, payoff, status}
        hook_desc = (
            h.get("description", h.get("desc", h.get("hook", "")))
        )

        # Unique-ish ID
        hid = h.get("id", "")
        if not hid:
            # Use md5 of description for deterministic ID
            import hashlib
            hid = f"migrated_hook_{hashlib.md5(hook_desc.encode('utf-8')).hexdigest()[:10]}"

        # Chapter
        ch_raw = h.get("chapter", h.get("planted", h.get("planted_chapter", 0)))
        chapter = _parse_chapter_num(ch_raw)

        # Type
        htype = h.get("type", h.get("tag", "悬念"))

        # Status
        raw_status = h.get("status", "活跃")
        status_map = {"active": "活跃", "兑现": "兑现", "resolved": "兑现",
                      "open": "活跃", "pending": "活跃", "evolved": "活跃",
                      "completed": "兑现", "done": "兑现", "finished": "兑现"}
        status = status_map.get(raw_status.lower(), "活跃") if isinstance(raw_status, str) else "活跃"

        # Expected payoff
        payoff = h.get("expected_payoff", h.get("payoff", h.get("expected_payoff_chapter", "")))

        result.append({
            "id": hid,
            "description": hook_desc[:300],
            "type": htype,
            "chapter_created": max(chapter, 0),
            "chapter_resolved": None,
            "status": status,
            "expected_payoff": str(payoff) if payoff else "",
        })

    return result


# ═══════════════════════════════════════════════════════════════
#  Main migration
# ═══════════════════════════════════════════════════════════════


def migrate_project(project_dir: str | Path, dry_run: bool = False) -> dict[str, Any]:
    """
    Migrate 3JSON → story-state.json for one project directory.

    Returns a report dict with counts and any warnings.
    """
    project = Path(project_dir)
    report: dict[str, Any] = {
        "project": project.name,
        "dry_run": dry_run,
        "warnings": [],
        "characters_migrated": 0,
        "chapters_migrated": 0,
        "hooks_migrated": 0,
        "notes": "",
    }

    # Load bible.json for novel title/volume
    title = project.name
    bible_path = project / "bible.json"
    if bible_path.exists():
        try:
            bible = json.load(bible_path.read_text(encoding="utf-8"))
            title = bible.get("title", title)
        except (json.JSONDecodeError, OSError):
            pass

    # ── Load chapter_appearances.json ─────────────────────────
    chapters: dict[int, dict] = {}
    ca_path = project / "chapter_appearances.json"
    if ca_path.exists():
        try:
            raw_ca = json.loads(ca_path.read_text(encoding="utf-8"))
            ca_fmt = _detect_format(raw_ca, "chapter_appearances.json")
            chapters = _normalize_chapters(raw_ca, ca_fmt)
            report["chapter_format"] = ca_fmt
        except (json.JSONDecodeError, OSError) as e:
            report["warnings"].append(f"chapter_appearances.json: {e}")

    # ── Load character_roster.json ────────────────────────────
    characters: dict[str, dict] = {}
    cr_path = project / "character_roster.json"
    if cr_path.exists():
        try:
            raw_cr = json.loads(cr_path.read_text(encoding="utf-8"))
            cr_fmt = _detect_format(raw_cr, "character_roster.json")
            characters = _normalize_characters(raw_cr, cr_fmt)
            report["character_format"] = cr_fmt
        except (json.JSONDecodeError, OSError) as e:
            report["warnings"].append(f"character_roster.json: {e}")

    # ── Load hooks.json ───────────────────────────────────────
    hooks: list[dict] = []
    hk_path = project / "hooks.json"
    if hk_path.exists():
        try:
            raw_hk = json.loads(hk_path.read_text(encoding="utf-8"))
            hk_fmt = _detect_format(raw_hk, "hooks.json")
            hooks = _normalize_hooks(raw_hk, hk_fmt)
            report["hook_format"] = hk_fmt
        except (json.JSONDecodeError, OSError) as e:
            report["warnings"].append(f"hooks.json: {e}")

    # ── Build StoryState ──────────────────────────────────────
    state = StoryState(novel_title=title, volume=project.name)

    # Characters
    for cid, c in characters.items():
        state.characters[cid] = CharacterState(
            id=c["id"],
            name=c["name"],
            role=c["role"],
            first_appearance=c["first_appearance"],
            last_appearance=c["last_appearance"],
            status=c["status"],
            state=c["state"],
            key_items=c["key_items"],
        )
    report["characters_migrated"] = len(characters)

    # Chapters
    for num in sorted(chapters):
        ch = chapters[num]
        rec = ChapterRecord(
            number=num,
            title=ch["title"],
            word_count=ch["word_count"],
            scene=ch["scene"],
            characters_present=ch["characters_present"],
            key_events=ch["key_events"],
            strand_weights={},
        )
        state.chapters[num] = rec
    report["chapters_migrated"] = len(chapters)
    if chapters:
        report["notes"] += f"章节范围 Ch{min(chapters)}–Ch{max(chapters)}"

    # Hooks
    for h in hooks:
        hid = h["id"]
        state.hooks[hid] = HookState(
            id=hid,
            description=h["description"],
            type=h["type"],
            chapter_created=h["chapter_created"],
            chapter_resolved=h.get("chapter_resolved"),
            status=h["status"],
            expected_payoff=h["expected_payoff"],
        )
    report["hooks_migrated"] = len(hooks)

    # Strands (rough estimate — real strand weights should be set after migration)
    if chapters:
        # Count chapters with key_events (quest-significant) vs scene (non-quest)
        quest_count = sum(1 for ch in chapters.values() if ch.get("key_events"))
        total = max(len(chapters), 1)
        quest_ratio = round(quest_count / total, 2)
        # Divide remaining equally between fire and constellation
        remaining = 1.0 - quest_ratio
        state.strands = StrandState(
            quest_ratio=quest_ratio,
            fire_ratio=round(remaining / 2, 2),
            constellation_ratio=round(remaining / 2, 2),
        )

    # ── Save ──────────────────────────────────────────────────
    if not dry_run:
        state.save(str(project))
        report["story_state_path"] = str(project / "story-state.json")
    else:
        report["story_state_path"] = "(dry-run, not written)"

    return report


def check_project(project_dir: str | Path) -> dict[str, Any]:
    """Quick check: which 3JSON files exist and are valid."""
    project = Path(project_dir)
    result = {"project": project.name, "files": {}}
    for fn in ["chapter_appearances.json", "character_roster.json", "hooks.json", "story-state.json"]:
        path = project / fn
        status = "missing"
        if path.exists():
            size = path.stat().st_size
            try:
                data = json.loads(path.read_text(encoding="utf-8"))
                fmt = _detect_format(data, fn)
                if isinstance(data, list):
                    count = len(data)
                elif isinstance(data, dict):
                    count = len(data)
                else:
                    count = 0
                status = f"✅ {fmt} ({count} items, {size:,} bytes)"
            except json.JSONDecodeError:
                status = f"❌ invalid JSON ({size:,} bytes)"
        result["files"][fn] = status
    return result


# ═══════════════════════════════════════════════════════════════
#  CLI
# ═══════════════════════════════════════════════════════════════

def _print_report(report: dict) -> None:
    """Pretty-print migration report."""
    dry = " (DRY RUN)" if report.get("dry_run") else ""
    print(f"\n{'='*50}")
    print(f"  迁移报告: {report['project']}{dry}")
    print(f"{'='*50}")
    print(f"  角色: {report['characters_migrated']} 个")
    print(f"  章节: {report['chapters_migrated']} 个 {report.get('notes', '')}")
    print(f"  伏笔: {report['hooks_migrated']} 个")
    if report.get("chapter_format"):
        print(f"  chapter格式: {report['chapter_format']}")
    if report.get("character_format"):
        print(f"  character格式: {report['character_format']}")
    if report.get("hook_format"):
        print(f"  hook格式: {report['hook_format']}")
    if report["warnings"]:
        print(f"\n  ⚠️ 警告 ({len(report['warnings'])}):")
        for w in report["warnings"]:
            print(f"    - {w}")
    print(f"  输出: {report.get('story_state_path', '(无')}")
    print()


if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser(description="3JSON → story-state.json 一键迁移")
    parser.add_argument("project_dir", nargs="?", help="小说项目目录")
    parser.add_argument("--all", action="store_true", help="迁移 novels/ 下所有项目")
    parser.add_argument("--check", action="store_true", help="只检查不写入")
    parser.add_argument("--dry-run", action="store_true", help="预览迁移结果但不写入")
    args = parser.parse_args()

    novels_dir = Path(__file__).resolve().parent.parent.parent.parent / "novels"

    if args.check:
        targets = [novels_dir / d for d in sorted(os.listdir(novels_dir))
                   if (novels_dir / d).is_dir()] if args.all else [Path(args.project_dir)]
        for t in targets:
            r = check_project(t)
            print(f"\n📁 {r['project']}")
            for fn, status in r["files"].items():
                print(f"  {fn:<35} {status}")
        sys.exit(0)

    if args.all:
        targets = sorted([
            novels_dir / d for d in os.listdir(novels_dir)
            if (novels_dir / d).is_dir() and (novels_dir / d / "chapter_appearances.json").exists()
        ])
    else:
        if not args.project_dir:
            parser.print_help()
            sys.exit(1)
        targets = [args.project_dir]

    for t in targets:
        report = migrate_project(t, dry_run=args.dry_run)
        _print_report(report)