文件预览

finance_validator.py

查看 pdf-fin-parse 技能包中的文件内容。

文件内容

scripts/finance_validator.py

"""业务规则校验:资产=负债+权益 / 小计=Σ明细。

best-effort:不修改原始数值,发现不一致写到 warnings 字段。
仅对 statement_type == "balance_sheet" 应用资产/负债/权益恒等式。

输入:已经过 numeric_normalizer + finance_terms_aligner 标注后的 cells
(每个 data 行至少有 row_label / group / value,多列时按列分别校验)
"""
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Dict, List, Optional

from finance_terms_aligner import (
    classify_row_label, is_grand_total_row, is_subtotal_row,
)


@dataclass
class Warning:
    code: str            # "asset_eq_liab_plus_equity_mismatch" / "subtotal_mismatch"
    severity: str        # "error" | "warning"
    column_idx: int      # 哪一列发现的
    detail: str
    expected: Optional[float] = None
    actual: Optional[float] = None
    rel_diff: Optional[float] = None


@dataclass
class ValidationReport:
    warnings: List[Warning] = field(default_factory=list)
    stats: Dict[str, int] = field(default_factory=dict)

    def add(self, w: Warning) -> None:
        self.warnings.append(w)
        self.stats[w.code] = self.stats.get(w.code, 0) + 1

    def is_clean(self) -> bool:
        return all(w.severity != "error" for w in self.warnings)


# ---------------------------------------------------------------------------
# 内部:把 cells 转成"按行的 label + 按列的数值"视图
# ---------------------------------------------------------------------------

def _by_row_col(cells) -> Dict[int, Dict[int, dict]]:
    """{row: {col: cell_dict}}, cell_dict 至少含 text/value。"""
    out: Dict[int, Dict[int, dict]] = {}
    for c in cells:
        out.setdefault(c["row"], {})[c["col"]] = c
    return out


# ---------------------------------------------------------------------------
# 校验:资产 = 负债 + 权益(每列独立)
# ---------------------------------------------------------------------------

_TOLERANCE_REL = 1e-3   # 千分之一相对误差(财报舍入到百万元/亿元很常见)
_TOLERANCE_ABS = 1.0    # 绝对误差兜底(单位归一为元,1 元内可忽略)


def _close_enough(actual: float, expected: float) -> bool:
    if expected == 0:
        return abs(actual) <= _TOLERANCE_ABS
    return abs(actual - expected) / abs(expected) <= _TOLERANCE_REL


def validate_balance_sheet(rows: List[dict],
                            data_cols: List[int]) -> ValidationReport:
    """资产负债表恒等式校验。

    Args:
        rows: 已展平的数据行,每行 dict 至少含:
              - row_label: str
              - group: "asset"/"liability"/"equity"/None
              - cells: {col_idx: value or None}
        data_cols: 要校验的数值列索引列表
    """
    rep = ValidationReport()

    # 收集每列的总计行(assets/liabilities/equity)
    totals: Dict[str, Dict[int, float]] = {"asset": {}, "liability": {}, "equity": {}}
    for r in rows:
        grp = is_subtotal_row(r.get("row_label", ""))
        if not grp:
            continue
        for col in data_cols:
            v = r["cells"].get(col)
            if v is None:
                continue
            # 同一 group 多个总计行(应不会发生)以最后一个为准
            totals[grp][col] = v

    # 资产 = 负债 + 权益(按列校验)
    for col in data_cols:
        a = totals["asset"].get(col)
        l = totals["liability"].get(col)
        e = totals["equity"].get(col)
        if a is None or l is None or e is None:
            continue
        if not _close_enough(l + e, a):
            rep.add(Warning(
                code="asset_eq_liab_plus_equity_mismatch",
                severity="error",
                column_idx=col,
                expected=a,
                actual=l + e,
                rel_diff=(l + e - a) / a if a else None,
                detail=f"col={col} asset={a:.2f} liab+equity={l + e:.2f} diff={l + e - a:.2f}",
            ))

    rep.stats["asset_totals_found"] = len(totals["asset"])
    rep.stats["liab_totals_found"] = len(totals["liability"])
    rep.stats["equity_totals_found"] = len(totals["equity"])
    return rep


# ---------------------------------------------------------------------------
# 校验:小计 = Σ 同 group 明细(best-effort,仅当能确定 group 范围时)
# ---------------------------------------------------------------------------

def validate_subtotals(rows: List[dict],
                        data_cols: List[int]) -> ValidationReport:
    """对每个 group(asset/liab/equity):last_subtotal 应 ≈ Σ(明细 rows 该 group 的 value).

    分段规则:
      - **仅** group header("资产:"/"负债:"/"股东权益:")切段
      - subtotal 行**不切段**(金融报表常有"归属于本行X合计/X合计"这种 nested subtotal,
        最后一个 subtotal 才是该 group 的 grand total)
      - 段尾的最后一个 subtotal 行作为 expected total,其余 subtotal 跳过不累计
      - 表尾自动收尾
    """
    rep = ValidationReport()

    segments: List[dict] = []
    current_group: Optional[str] = None
    current_data_rows: List[dict] = []
    current_subtotals: List[dict] = []

    def _close_segment():
        if current_group is None:
            return
        segments.append({
            "group": current_group,
            "rows": list(current_data_rows),
            "subtotals": list(current_subtotals),
        })

    for r in rows:
        label = r.get("row_label", "") or ""
        hdr_group = classify_row_label(label)
        if hdr_group:
            _close_segment()
            current_group = hdr_group
            current_data_rows = []
            current_subtotals = []
            continue
        if current_group is None:
            continue
        # 全表总计("负债和股东权益总计"):既非 subtotal 也非明细,跳过
        if is_grand_total_row(label):
            continue
        # subtotal 行:记录但不切段、不计入明细累加
        if is_subtotal_row(label):
            current_subtotals.append(r)
            continue
        current_data_rows.append(r)
    _close_segment()

    for seg in segments:
        if not seg["subtotals"]:
            continue
        last_sub = seg["subtotals"][-1]
        for col in data_cols:
            expected = last_sub["cells"].get(col)
            if expected is None:
                continue
            actual = sum(
                rr["cells"].get(col)
                for rr in seg["rows"]
                if rr["cells"].get(col) is not None
            )
            if not _close_enough(actual, expected):
                rep.add(Warning(
                    code="subtotal_mismatch",
                    severity="warning",
                    column_idx=col,
                    expected=expected,
                    actual=actual,
                    rel_diff=(actual - expected) / expected if expected else None,
                    detail=(f"group={seg['group']} col={col} "
                            f"subtotal={expected:.2f} sum={actual:.2f} "
                            f"diff={actual - expected:.2f} "
                            f"(data_rows={len(seg['rows'])}, "
                            f"intermediate_subtotals={len(seg['subtotals']) - 1})"),
                ))

    rep.stats["segments_checked"] = len(segments)
    return rep


def validate(rows: List[dict], data_cols: List[int],
              statement_type: Optional[str]) -> ValidationReport:
    """统一入口:按 statement_type 选择适用校验项。"""
    merged = ValidationReport()
    if statement_type == "balance_sheet":
        r1 = validate_balance_sheet(rows, data_cols)
        r2 = validate_subtotals(rows, data_cols)
        for w in r1.warnings + r2.warnings:
            merged.add(w)
        for k, v in {**r1.stats, **r2.stats}.items():
            merged.stats[k] = v
    return merged