文件预览

skill.py

查看 pdf-fin-parse 技能包中的文件内容。

文件内容

scripts/skill.py

"""pdf-finance-parser CLI 主入口

参考 byted-las-document-parse 的工程范式:
  - 所有过程日志走 stderr,stdout 仅输出一行结构化 JSON
  - 通过子命令分阶段编排:info / parse / submit / check-and-notify
  - 不做规则解析,所有"理解"委托给多模态视觉模型(vlm_client)

环境变量加载优先级:
  1) --env-file 显式指定(强制覆盖已有变量)
  2) skill 目录下 env.sh(不覆盖)
  3) 当前工作目录 env.sh(不覆盖)
"""
from __future__ import annotations

import argparse
import json
import os
import re
import sys
import time
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from threading import Lock
from typing import Any, Dict, Optional

# 同目录模块(运行 `python skill.py` 时 scripts/ 在 sys.path)
sys.path.insert(0, str(Path(__file__).parent))

from vlm_client import (  # noqa: E402
    VLMError, VLMConfig, vlm_config_from_env, parse_page_via_vlm,
    repair_cells_via_vlm,
)
from pdf_renderer import render_pdf_pages, count_pdf_pages              # noqa: E402
from prompts import build_prompt, list_modes                            # noqa: E402
from rule_extractor import analyze_page, extract_page_via_rules         # noqa: E402
from save_utils import (                                                # noqa: E402
    save_per_page, save_final_outputs, build_summary, extract_preview,
)


# ---------------------------------------------------------------------------
# 日志
# ---------------------------------------------------------------------------

def _log(msg: str) -> None:
    print(msg, file=sys.stderr)


def _err(msg: str) -> None:
    print(f"✗ {msg}", file=sys.stderr)


# ---------------------------------------------------------------------------
# env.sh 加载(与 byted-las 同款)
# ---------------------------------------------------------------------------

_ENV_LINE_RE = re.compile(
    r"""^\s*(?:export\s+)?(\w+)\s*=\s*(?:"([^"]*)"|'([^']*)'|(\S+?))\s*(?:#.*)?$"""
)


def _load_env_file(env_file: Path) -> Dict[str, str]:
    if not env_file.exists():
        return {}
    out: Dict[str, str] = {}
    for line in env_file.read_text(encoding="utf-8", errors="ignore").splitlines():
        line = line.strip()
        if not line or line.startswith("#"):
            continue
        m = _ENV_LINE_RE.match(line)
        if m:
            k = m.group(1)
            v = m.group(2) or m.group(3) or m.group(4) or ""
            if v:
                out[k] = v
    return out


def _auto_load_env(cli_env_file: Optional[str] = None) -> None:
    """显式 --env-file 强制覆盖;自动发现的不覆盖。"""
    if cli_env_file:
        p = Path(cli_env_file).resolve()
        if p.exists():
            for k, v in _load_env_file(p).items():
                os.environ[k] = v
            _log(f"已加载 env: {p}")
            return
        _log(f"警告: 指定的 env 文件不存在: {cli_env_file}")

    skill_dir = Path(__file__).resolve().parent.parent
    for candidate in (skill_dir / "env.sh", Path.cwd() / "env.sh"):
        if candidate.exists():
            for k, v in _load_env_file(candidate).items():
                os.environ.setdefault(k, v)
            _log(f"已自动加载 env: {candidate}")
            return


# ---------------------------------------------------------------------------
# 页范围
# ---------------------------------------------------------------------------

def _parse_page_range(spec: Optional[str], total: int) -> list[int]:
    if not spec:
        return list(range(1, total + 1))
    out: list[int] = []
    for part in spec.split(","):
        part = part.strip()
        if "-" in part:
            a, b = part.split("-", 1)
            out.extend(range(int(a), int(b) + 1))
        else:
            out.append(int(part))
    return sorted(p for p in set(out) if 1 <= p <= total)


# ---------------------------------------------------------------------------
# 核心:单文档解析(sync)
# ---------------------------------------------------------------------------

VALID_ROUTING_MODES = ("auto", "rules", "ai")


def _route_page(hints: Dict[str, Any]) -> str:
    """根据 analyze_page 给出的 hints 决定 per-page 路由。

    返回:"rules" / "vlm-normal" / "vlm-detail"

    决策树(保守,宁可路由给 VLM 也不让规则路径吃下复杂表):
      1) 文本密度 < 50 → vlm-normal(扫描/图像页)
      2) 有边框表 OR 复杂表 → vlm-normal
      3) v ≥ 5 (无边框表纵向数字流) OR h ≥ 3 (横向数字表行) → vlm-normal
      4) 多栏排版 → vlm-normal
      5) 否则 → rules
    """
    if hints["text_density"] < 50:
        return "vlm-normal"
    if hints["n_tables"] > 0 or hints["has_complex_table"]:
        return "vlm-normal"
    if (hints["numeric_lines_vertical"] >= 5
            or hints["numeric_lines_horizontal"] >= 3):
        return "vlm-normal"
    if hints["is_multi_column"]:
        return "vlm-normal"
    return "rules"


def _collect_repair_candidates(page_result: Dict[str, Any]) -> list[Dict[str, Any]]:
    """从单页结果挑出"text 含数字、但 value=null"的 cells;返回 [{id, text}],
    同时把 (table_idx, row, col) 记到 id↔位置 的映射通过 id 传递。

    id 编码:table_idx * 10000 + row * 100 + col(同页内唯一即可)
    """
    candidates = []
    for ti, t in enumerate(page_result.get("tables") or []):
        for c in t.get("cells") or []:
            text = (c.get("text") or "").strip()
            if not text or c.get("value") is not None:
                continue
            if not any(ch.isdigit() for ch in text):
                continue
            cid = ti * 10000 + c.get("row", 0) * 100 + c.get("col", 0)
            candidates.append({"id": cid, "text": text})
    return candidates


def _apply_repair(page_result: Dict[str, Any],
                  fixes: Dict[int, Dict[str, Any]]) -> int:
    """把 repair 返回的 {id: {value, unit}} 写回 page_result.tables[].cells;返回写入数。"""
    applied = 0
    for ti, t in enumerate(page_result.get("tables") or []):
        for c in t.get("cells") or []:
            cid = ti * 10000 + c.get("row", 0) * 100 + c.get("col", 0)
            fix = fixes.get(cid)
            if not fix:
                continue
            new_value = fix.get("value")
            if new_value is None:
                continue  # 模型还是没解析出来,跳过
            c["value"] = new_value
            if fix.get("unit") is not None and c.get("unit") is None:
                c["unit"] = fix["unit"]
            applied += 1
    return applied


def _process_single_page(
    *,
    page_no: int,
    route: str,
    img_bytes: Optional[bytes],
    pdf_path: str,
    password: Optional[str],
    prompt_map: Dict[str, str],
    cfg: VLMConfig,
    repair: bool,
    images_dir: Optional[Path],
    pages_dir: Path,
) -> tuple[Optional[Dict[str, Any]], Optional[Dict[str, Any]]]:
    """单页处理:按 route 调规则路径 或 VLM 路径 → (可选)repair → 落盘。

    返回 (page_result, failure)。两者互斥:成功返 (result, None),失败返 (None, failure)。
    route ∈ {"rules", "vlm-normal", "vlm-detail"}
    """
    if route == "rules":
        try:
            page_result = extract_page_via_rules(pdf_path, page_no, password=password)
            save_per_page(pages_dir, page_no, page_result)
            return page_result, None
        except Exception as e:                           # noqa: BLE001
            _err(f"page {page_no} rules failed: {e}")
            return None, {"page": page_no, "reason": f"RulesError: {e}"}

    # VLM 分支
    assert img_bytes is not None, "VLM route requires rendered image"
    if images_dir:
        (images_dir / f"p{page_no}.png").write_bytes(img_bytes)
    parse_mode = "detail" if route == "vlm-detail" else "normal"
    try:
        page_result = parse_page_via_vlm(
            image_bytes=img_bytes,
            prompt=prompt_map[parse_mode],
            cfg=cfg,
            page_no=page_no,
        )
        page_result["engine"] = f"vlm-{parse_mode}"     # 覆盖 vlm_client 默认的 vlm-{provider}
        if repair:
            cand = _collect_repair_candidates(page_result)
            if cand:
                _log(f"[repair] page {page_no}: {len(cand)} cell(s) need value, "
                     f"calling model again ...")
                try:
                    fixes = repair_cells_via_vlm(cells=cand, cfg=cfg)
                    applied = _apply_repair(page_result, fixes)
                    _log(f"[repair] page {page_no}: applied {applied}/{len(cand)}")
                except VLMError as e:
                    _log(f"[repair] page {page_no} repair failed (continue): {e}")
        save_per_page(pages_dir, page_no, page_result)
        return page_result, None
    except VLMError as e:
        _err(f"page {page_no} failed: {e}")
        return None, {"page": page_no, "reason": f"VLMError: {e}"}


def _plan_routes(input_path: str, pages: list[int],
                 routing: str, password: Optional[str]
                 ) -> Dict[int, tuple[str, Dict[str, Any]]]:
    """对每一页做 analyze + route,返回 {page_no: (route, hints)}。

    routing="ai"    → 所有页强制 vlm-normal(与旧行为完全兼容)
    routing="rules" → 所有页强制 rules(基线对照用)
    routing="auto"  → 按 _route_page 决策
    """
    plan: Dict[int, tuple[str, Dict[str, Any]]] = {}
    for pn in pages:
        if routing == "ai":
            plan[pn] = ("vlm-normal", {})
            continue
        if routing == "rules":
            plan[pn] = ("rules", {})
            continue
        # auto
        hints = analyze_page(input_path, pn, password=password)
        plan[pn] = (_route_page(hints), hints)
    return plan


def run_parse(
    *,
    input_path: str,
    output_dir: str,
    parse_mode: str = "normal",
    pages_spec: Optional[str] = None,
    model: Optional[str] = None,
    dpi: int = 200,
    save_images: bool = False,
    password: Optional[str] = None,
    task_id: Optional[str] = None,
    repair: bool = False,
    concurrency: int = 1,
    routing: str = "auto",
) -> Dict[str, Any]:
    """端到端解析:路由 → 渲染(仅 VLM 页) → 调模型/规则 → 落盘。

    routing="auto" (默认): per-page 路由(规则页跑本地,复杂页跑 VLM)
    routing="ai":         所有页 vlm-normal(与重构前完全一致)
    routing="rules":      所有页规则(基线,用于评测对照)

    Returns: summary dict(供 stdout 输出)
    """
    if routing not in VALID_ROUTING_MODES:
        raise ValueError(f"routing must be one of {VALID_ROUTING_MODES}; got {routing!r}")

    out_dir = Path(output_dir)
    out_dir.mkdir(parents=True, exist_ok=True)
    task_id = task_id or f"task_{uuid.uuid4().hex[:12]}"
    t0 = time.time()

    cfg = vlm_config_from_env()
    if cfg is None and routing != "rules":
        raise RuntimeError(
            "VLM 未配置:请设置 ARK_API_KEY 或 OPENAI_API_KEY(参考 env.sh.example);"
            "若只想跑规则路径,加 --routing rules"
        )
    if cfg is not None and model:
        cfg.model = model

    total = count_pdf_pages(input_path, password=password)
    pages = _parse_page_range(pages_spec, total)
    _log(f"[parse] doc={Path(input_path).name} total={total} target_pages={len(pages)} "
         f"routing={routing} parse_mode={parse_mode} "
         f"model={cfg.model if cfg else '-'} concurrency={concurrency} task_id={task_id}")

    # 1) 路由阶段:对每页做 sub-second 分析 + 决策
    plan = _plan_routes(input_path, pages, routing, password)
    route_counts: Dict[str, int] = {}
    for pn, (route, _) in plan.items():
        route_counts[route] = route_counts.get(route, 0) + 1
    _log(f"[plan] routes: {route_counts}")

    # Prompt map for VLM routes
    prompt_map = {m: build_prompt(m) for m in list_modes()}

    per_page_results: list[dict] = []
    failed: list[dict] = []

    pages_dir = out_dir / "pages"
    pages_dir.mkdir(parents=True, exist_ok=True)
    images_dir = (out_dir / "images") if save_images else None
    if images_dir:
        images_dir.mkdir(parents=True, exist_ok=True)

    # 2) 只渲染 VLM 路由的页(规则页不需要图像)
    vlm_pages = [pn for pn, (r, _) in plan.items() if r != "rules"]
    rendered_map: Dict[int, bytes] = {}
    if vlm_pages:
        for pn, img in render_pdf_pages(input_path, vlm_pages, dpi=dpi, password=password):
            rendered_map[pn] = img
        _log(f"[parse] rendered {len(rendered_map)} VLM pages, dispatching ...")
    else:
        _log("[parse] all pages route to rules (no VLM calls)")

    progress_lock = Lock()
    done_count = [0]

    def _on_one(page_no: int):
        route, _hints = plan[page_no]
        img = rendered_map.get(page_no)
        result, failure = _process_single_page(
            page_no=page_no, route=route, img_bytes=img,
            pdf_path=input_path, password=password,
            prompt_map=prompt_map, cfg=cfg,
            repair=repair, images_dir=images_dir, pages_dir=pages_dir,
        )
        with progress_lock:
            done_count[0] += 1
            _log(f"[parse] ({done_count[0]}/{len(pages)}) page={page_no} "
                 f"route={route} done")
        return page_no, result, failure

    if concurrency <= 1:
        for page_no in pages:
            route, _ = plan[page_no]
            _log(f"[parse] page={page_no} route={route} ...")
            _, result, failure = _on_one(page_no)
            if result is not None:
                per_page_results.append(result)
            else:
                failed.append(failure)
    else:
        with ThreadPoolExecutor(max_workers=concurrency,
                                thread_name_prefix="route") as pool:
            futures = [pool.submit(_on_one, pn) for pn in pages]
            for f in as_completed(futures):
                _, result, failure = f.result()
                if result is not None:
                    per_page_results.append(result)
                else:
                    failed.append(failure)

    # 并发情况下乱序,按 page_no 排回去(save_final_outputs 假定有序)
    per_page_results.sort(key=lambda r: r.get("page_no", 0))
    failed.sort(key=lambda f: f.get("page", 0))

    summary = build_summary(
        input_path=input_path,
        output_dir=str(out_dir),
        task_id=task_id,
        parse_mode=parse_mode,
        model=cfg.model if cfg else "-",
        total_pages=total,
        per_page_results=per_page_results,
        failed_pages=failed,
        wall_time_seconds=round(time.time() - t0, 3),
    )
    # 路由 breakdown 注入 summary,给 meta.json 也带上
    summary["routing"] = {
        "mode": routing,
        "page_routes": {pn: r for pn, (r, _) in plan.items()},
        "counts": route_counts,
    }
    save_final_outputs(out_dir, summary, per_page_results)
    summary["preview"] = extract_preview(summary["markdown_size_bytes"], out_dir)
    return summary


# ---------------------------------------------------------------------------
# 异步语义包装:submit + check-and-notify
# ---------------------------------------------------------------------------

def cmd_submit(args: argparse.Namespace) -> int:
    """submit:注册一个 task_id,把参数写到 <output_dir>/task.json,立刻返回。

    后续 check-and-notify 用同一个 output_dir 跑实际解析(这样设计是为了和 byted-las
    的 async 形态对齐——若日后换成真正的远端异步算子,这里只需替换执行体)。
    """
    task_id = f"task_{uuid.uuid4().hex[:12]}"
    out_dir = Path(args.output or f"/tmp/pdf_finance_parse_{task_id}")
    out_dir.mkdir(parents=True, exist_ok=True)
    task_meta = {
        "task_id": task_id,
        "input": args.input,
        "parse_mode": args.parse_mode,
        "pages": args.pages,
        "model": args.model,
        "dpi": args.dpi,
        "save_images": args.save_images,
        "repair": args.repair,
        "concurrency": args.concurrency,
        "routing": args.routing,
        "submitted_at": time.time(),
    }
    (out_dir / "task.json").write_text(
        json.dumps(task_meta, ensure_ascii=False, indent=2), encoding="utf-8"
    )
    total = count_pdf_pages(args.input, password=args.password)
    pages = _parse_page_range(args.pages, total)
    eta = _estimate_eta(len(pages), args.parse_mode)
    result = {
        "task_id": task_id,
        "output_dir": str(out_dir),
        "eta": eta,
        "total_pages": total,
        "target_pages": len(pages),
        "parse_mode": args.parse_mode,
    }
    print(json.dumps(result, ensure_ascii=False))
    return 0


def cmd_check_and_notify(args: argparse.Namespace) -> int:
    """check-and-notify:执行 submit 时登记的解析任务,落盘结果,输出 summary。

    --poll 在本实现下是 no-op(本地同步执行无需轮询),保留参数兼容 byted-las 风格。
    """
    out_dir = Path(args.output or f"/tmp/pdf_finance_parse_{args.task_id}")
    task_file = out_dir / "task.json"
    if not task_file.exists():
        _err(f"找不到 task 元数据:{task_file}(请先 submit)")
        return 1
    task_meta = json.loads(task_file.read_text(encoding="utf-8"))
    try:
        summary = run_parse(
            input_path=task_meta["input"],
            output_dir=str(out_dir),
            parse_mode=task_meta.get("parse_mode", "normal"),
            pages_spec=task_meta.get("pages"),
            model=task_meta.get("model"),
            dpi=task_meta.get("dpi", 200),
            save_images=task_meta.get("save_images", False),
            password=args.password,
            task_id=task_meta["task_id"],
            repair=task_meta.get("repair", False),
            concurrency=task_meta.get("concurrency", 1),
            routing=task_meta.get("routing", "auto"),
        )
    except Exception as e:
        _err(f"解析失败: {e}")
        print(json.dumps(
            {"status": "FAILED", "task_id": task_meta["task_id"], "error_msg": str(e)},
            ensure_ascii=False,
        ))
        return 1
    print(json.dumps(summary, ensure_ascii=False))
    return 0 if summary["status"] == "COMPLETED" else 1


def cmd_parse(args: argparse.Namespace) -> int:
    """同步 parse:一次调用完成所有工作。"""
    try:
        summary = run_parse(
            input_path=args.input,
            output_dir=args.output,
            parse_mode=args.parse_mode,
            pages_spec=args.pages,
            model=args.model,
            dpi=args.dpi,
            save_images=args.save_images,
            password=args.password,
            repair=args.repair,
            concurrency=args.concurrency,
            routing=args.routing,
        )
    except Exception as e:
        _err(f"解析失败: {e}")
        print(json.dumps(
            {"status": "FAILED", "error_msg": str(e)}, ensure_ascii=False
        ))
        return 1
    print(json.dumps(summary, ensure_ascii=False))
    return 0 if summary["status"] == "COMPLETED" else 1


def cmd_parse_las(args: argparse.Namespace) -> int:
    """v0.3 P1:LAS 算子解析(异步 submit/poll),落盘 markdown + 完整响应。

    输出目录结构:
      {output}/
        result.md          # 整篇 markdown(表格是 HTML <table>)
        result.full.json   # 完整 LAS 响应(含 detail[].text_blocks)
        meta.json          # 任务元信息
        pages/p{N}.md      # 每页 markdown(便于评测/对比)
    """
    # 懒加载,避免 v0.2 路径下因缺 requests/tos 报错
    from las_client import submit as las_submit, wait_completion, LASError
    from tos_uploader import resolve_input, TOSConfigError

    t_start = time.time()
    out_dir = Path(args.output)
    out_dir.mkdir(parents=True, exist_ok=True)
    pages_dir = out_dir / "pages"
    pages_dir.mkdir(exist_ok=True)

    # --pages 1-10 → start_page=1, num_pages=10
    start_page = 1
    num_pages: Optional[int] = None
    if args.pages:
        spec = args.pages.strip()
        if "," in spec:
            raise SystemExit("LAS 仅支持连续页范围(如 1-10),不支持 1,3,5 离散页")
        if "-" in spec:
            a, b = spec.split("-", 1)
            start_page = int(a)
            num_pages = int(b) - int(a) + 1
        else:
            start_page = int(spec)
            num_pages = 1

    # 1. 输入解析(本地 PDF → 上传 TOS;远程/tos 直通)
    try:
        final_url, input_meta = resolve_input(
            args.input,
            bucket=args.tos_bucket,
            prefix=args.tos_prefix,
            region=args.region,
        )
    except (TOSConfigError, FileNotFoundError, ValueError) as e:
        _err(f"输入解析失败: {e}")
        return 2

    _log(f"[parse-las] input_type={input_meta.get('input_type')} "
         f"final_url={final_url[:80]}")

    # 2. 提交 LAS 任务
    try:
        task_id = las_submit(
            final_url,
            parse_mode=args.parse_mode,
            start_page=start_page,
            num_pages=num_pages,
            region=args.region,
        )
    except LASError as e:
        _err(f"LAS submit 失败: {e}")
        return 1

    # 3. 阻塞轮询直到终态
    try:
        resp = wait_completion(
            task_id,
            region=args.region,
            poll_interval=args.poll_interval,
            max_attempts=args.max_poll_attempts,
        )
    except LASError as e:
        _err(f"LAS poll 失败: {e}")
        # 把已知的 task_id 也输出,便于手动恢复
        print(json.dumps({"status": "FAILED", "task_id": task_id,
                          "error_msg": str(e)}, ensure_ascii=False))
        return 1

    # 4. 落盘
    data = resp.get("data") or {}
    markdown = data.get("markdown") or ""
    detail = data.get("detail") or []

    (out_dir / "result.md").write_text(markdown, encoding="utf-8")
    (out_dir / "result.full.json").write_text(
        json.dumps(resp, ensure_ascii=False, indent=2), encoding="utf-8"
    )
    for page in detail:
        pid = page.get("page_id")
        page_md = page.get("page_md") or ""
        if pid is not None:
            (pages_dir / f"p{pid}.md").write_text(page_md, encoding="utf-8")

    table_count = markdown.count("<table")
    image_count = sum(
        1 for p in detail for b in (p.get("text_blocks") or [])
        if b.get("label") == "image"
    )
    page_count = len(detail)

    # 5. v0.3 金融后处理层(best-effort:失败不阻塞 LAS 原始落盘)
    postprocess_summary: Dict[str, Any] = {"applied": False}
    if not getattr(args, "no_postprocess", False):
        try:
            from html_table_parser import parse_page_tables
            from cross_page_merger import build_page_tables, merge_across_pages
            from output_writer import build_output_doc, write_output_files

            page_table_pairs: list = []
            page_context_map: Dict[int, str] = {}
            for page in detail:
                pid = page.get("page_id")
                page_md = page.get("page_md") or ""
                if pid is None:
                    continue
                page_tables = parse_page_tables(page_md)
                for t in page_tables:
                    page_table_pairs.append((pid, t))
                # 用首张表的 preface 作整页 context(单位声明通常在首张表上方)
                if page_tables:
                    page_context_map[pid] = page_tables[0].context_text()

            pts = build_page_tables(page_table_pairs)
            merged_tables = merge_across_pages(pts)

            merged_with_ctx = []
            for mt in merged_tables:
                ctx = ""
                if mt.source_pages:
                    ctx = page_context_map.get(mt.source_pages[0], "")
                merged_with_ctx.append((mt, ctx))

            doc_id = Path(args.input).stem
            output_doc = build_output_doc(
                las_resp=resp,
                merged_tables_with_ctx=merged_with_ctx,
                doc_id=doc_id,
                source_file=args.input,
                pages=page_count,
                engine="vlm-ark",   # schema enum 受限;表征"经 LAS+后处理"
            )
            wrote = write_output_files(out_dir, output_doc, markdown)
            postprocess_summary = {
                "applied": True,
                "merged_table_count": len(merged_tables),
                "raw_table_count": len(page_table_pairs),
                "balance_sheet_count": sum(
                    1 for t in output_doc["tables"]
                    if t.get("statement_type") == "balance_sheet"
                ),
                "validation_warning_count": sum(
                    len((t.get("validation") or {}).get("warnings", []))
                    for t in output_doc["tables"]
                ),
                "output_json": wrote["output_json"],
            }
        except Exception as e:
            _err(f"后处理失败(保留 LAS 原始输出): {type(e).__name__}: {e}")
            postprocess_summary = {"applied": False, "error": str(e)}

    wall = round(time.time() - t_start, 2)

    meta = {
        "status": "COMPLETED",
        "task_id": task_id,
        "engine": "las/las_pdf_parse_doubao",
        "parse_mode": args.parse_mode,
        "region": args.region,
        "input": args.input,
        "input_type": input_meta.get("input_type"),
        "input_size_mb": input_meta.get("size_mb"),
        "input_pages": input_meta.get("pages"),
        "start_page": start_page,
        "num_pages": num_pages,
        "output_dir": str(out_dir),
        "page_count": page_count,
        "table_count": table_count,
        "image_count": image_count,
        "markdown_size_bytes": len(markdown.encode("utf-8")),
        "wall_time_seconds": wall,
        "postprocess": postprocess_summary,
    }
    (out_dir / "meta.json").write_text(
        json.dumps(meta, ensure_ascii=False, indent=2), encoding="utf-8"
    )

    # 5. 单行 JSON 摘要到 stdout
    summary = dict(meta)
    summary["preview"] = markdown[:300]
    print(json.dumps(summary, ensure_ascii=False))
    return 0


def cmd_info(args: argparse.Namespace) -> int:
    """info:打印当前环境配置(API key 是否就绪、用哪个模型/endpoint),仅 stderr。"""
    cfg = vlm_config_from_env()
    _log(f"available_modes: {', '.join(list_modes())}")
    if cfg is None:
        _log("VLM: NOT configured (set ARK_API_KEY or OPENAI_API_KEY)")
        return 1
    _log(f"VLM provider: {cfg.provider}")
    _log(f"VLM model:    {cfg.model}")
    _log(f"VLM base_url: {cfg.base_url or '(default)'}")
    _log(f"VLM timeout:  {cfg.timeout_seconds}s, retries={cfg.max_retries}")
    return 0


# ---------------------------------------------------------------------------
# 预估耗时
# ---------------------------------------------------------------------------

_ETA_PER_PAGE = {"normal": (3, 8), "detail": (6, 15)}


def _estimate_eta(num_pages: int, mode: str) -> str:
    lo, hi = _ETA_PER_PAGE.get(mode, (3, 8))
    lo_total = lo * num_pages
    hi_total = hi * num_pages

    def _fmt(s: int) -> str:
        if s < 60:
            return f"{s}秒"
        return f"{s // 60}分{s % 60}秒"
    return f"{_fmt(lo_total)}~{_fmt(hi_total)} ({num_pages}页·{mode})"


# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------

def build_parser() -> argparse.ArgumentParser:
    p = argparse.ArgumentParser(
        prog="skill.py", description="pdf-finance-parser CLI (AI-first)"
    )
    p.add_argument("--env-file", help="env.sh 路径(强制覆盖已有环境变量)")
    sp = p.add_subparsers(dest="cmd")

    common = argparse.ArgumentParser(add_help=False)
    common.add_argument("--parse-mode", default="normal",
                        choices=list_modes(), help="解析模式")
    common.add_argument("--pages", default=None,
                        help="页范围,如 1-10,15,20-25")
    common.add_argument("--model", default=None,
                        help="覆盖默认模型名(如 doubao-1.5-vision-pro)")
    common.add_argument("--dpi", type=int, default=200,
                        help="PDF 渲染 DPI(默认 200)")
    common.add_argument("--save-images", action="store_true",
                        help="保留渲染后的页图到 images/")
    common.add_argument("--password", default=None, help="加密 PDF 的密码")
    common.add_argument("--repair", action="store_true",
                        help="二次精修:对 text 含数字但 value=null 的 cell 再调一次模型补齐 value/unit")
    common.add_argument("--concurrency", type=int, default=1,
                        help="并发页数(默认 1 串行;建议 4 开始)。"
                             "注意 provider QPM 限制——触发限流会触发指数退避,反而更慢")
    common.add_argument("--routing", default="auto",
                        choices=list(VALID_ROUTING_MODES),
                        help="路由模式:"
                             "auto = 按页特征智能分发(默认);"
                             "ai = 所有页强制 VLM;"
                             "rules = 所有页规则(基线对照)")

    # info
    sp.add_parser("info", help="打印当前 VLM 配置(仅 stderr)")

    # parse (synchronous, one-shot)
    p_parse = sp.add_parser("parse", parents=[common], help="同步解析(一次到底)")
    p_parse.add_argument("--input", required=True, help="PDF/图片路径")
    p_parse.add_argument("--output", required=True, help="结果输出目录")

    # submit (async semantics — pre-register a task)
    p_submit = sp.add_parser("submit", parents=[common], help="提交任务(异步语义)")
    p_submit.add_argument("--input", required=True)
    p_submit.add_argument("--output", help="结果目录(默认 /tmp/pdf_finance_parse_{task_id})")

    # check-and-notify (execute the pre-registered task)
    p_check = sp.add_parser("check-and-notify",
                            help="执行 submit 登记的任务并落盘")
    p_check.add_argument("--task-id", required=True)
    p_check.add_argument("--output", help="task.json 所在目录")
    p_check.add_argument("--poll", action="store_true",
                         help="兼容标志:本实现下同步执行,无需轮询")
    p_check.add_argument("--password", default=None)

    # parse-las (v0.3 路径:LAS 算子解析,绕开 VLM 整页推理 timeout)
    p_las = sp.add_parser("parse-las",
                          help="v0.3:LAS las_pdf_parse_doubao 算子异步解析")
    p_las.add_argument("--input", required=True,
                       help="本地 PDF 路径 / http(s) URL / tos:// 路径")
    p_las.add_argument("--output", required=True, help="结果输出目录")
    p_las.add_argument("--parse-mode", default="normal",
                       choices=["normal", "detail"],
                       help="LAS 模式:normal(默认)/ detail(深度,2x 价)")
    p_las.add_argument("--pages", default=None,
                       help="连续页范围(如 1-10)。"
                            "LAS 仅支持 start_page + num_pages,不支持离散页")
    p_las.add_argument("--region", default="cn-beijing",
                       choices=["cn-beijing", "cn-shanghai"])
    p_las.add_argument("--tos-bucket", default=None,
                       help="覆盖 env 的 TOS_BUCKET(仅本地 PDF 上传时需要)")
    p_las.add_argument("--tos-prefix", default="pdf-finance-parser/uploads")
    p_las.add_argument("--poll-interval", type=int, default=None,
                       help="轮询间隔秒数(默认 10/20/30 智能退避)")
    p_las.add_argument("--max-poll-attempts", type=int, default=60)
    p_las.add_argument("--no-postprocess", action="store_true",
                       help="跳过 v0.3 金融后处理(仅保留 LAS 原始 markdown/JSON 输出)")

    return p


def main(argv: list[str]) -> int:
    parser = build_parser()
    if not argv:
        parser.print_help()
        return 0
    args = parser.parse_args(argv)
    _auto_load_env(getattr(args, "env_file", None))
    if not args.cmd:
        parser.print_help()
        return 0
    if args.cmd == "info":
        return cmd_info(args)
    if args.cmd == "parse":
        return cmd_parse(args)
    if args.cmd == "submit":
        return cmd_submit(args)
    if args.cmd == "check-and-notify":
        return cmd_check_and_notify(args)
    if args.cmd == "parse-las":
        return cmd_parse_las(args)
    parser.print_help()
    return 0


if __name__ == "__main__":
    raise SystemExit(main(sys.argv[1:]))