文件预览

sensitivity.py

查看 FP-DCF 技能包中的文件内容。

文件内容

fp_dcf/sensitivity.py

from __future__ import annotations

from copy import deepcopy
from math import isfinite

from .engine import run_valuation
from .schemas import SensitivityHeatmapOutput, ValuationSummary


_METRIC_LABELS = {
    "per_share_value": "Per Share Value",
    "equity_value": "Equity Value",
    "enterprise_value": "Enterprise Value",
}


def _coerce_float(value) -> float | None:
    if value is None or value == "":
        return None
    try:
        out = float(value)
    except (TypeError, ValueError):
        return None
    return out if isfinite(out) else None


def _clip_rate(value: float | None, *, low: float, high: float) -> float | None:
    if value is None:
        return None
    return min(max(value, low), high)


def _grid_around(center: float, *, range_bps: int, step_bps: int, floor: float, ceiling: float) -> list[float]:
    if step_bps <= 0:
        raise ValueError("step_bps must be positive")
    if range_bps < 0:
        raise ValueError("range_bps must be non-negative")

    values: list[float] = []
    for offset_bps in range(-range_bps, range_bps + 1, step_bps):
        value = center + (offset_bps / 10000.0)
        if floor <= value <= ceiling:
            values.append(round(value, 6))

    values.sort()
    return values


def _metric_value(summary: ValuationSummary, metric: str) -> float | None:
    if metric not in _METRIC_LABELS:
        raise ValueError(f"Unsupported sensitivity metric: {metric}")
    return _coerce_float(getattr(summary, metric, None))


def _steady_state_value(
    *,
    fcff_anchor: float,
    wacc: float,
    terminal_growth_rate: float,
    net_debt: float,
    shares_out: float | None,
) -> ValuationSummary | None:
    if terminal_growth_rate >= wacc:
        return None

    fcff_1 = fcff_anchor * (1.0 + terminal_growth_rate)
    enterprise_value = fcff_1 / (wacc - terminal_growth_rate)
    equity_value = enterprise_value - net_debt
    per_share = equity_value / shares_out if shares_out and shares_out > 0 else None
    return ValuationSummary(
        enterprise_value=enterprise_value,
        equity_value=equity_value,
        per_share_value=per_share,
        terminal_growth_rate=terminal_growth_rate,
        terminal_growth_rate_effective=terminal_growth_rate,
        present_value_stage1=0.0,
        present_value_terminal=enterprise_value,
        terminal_value_share=1.0,
    )


def _two_stage_value(
    *,
    fcff_anchor: float,
    wacc: float,
    high_growth_rate: float,
    high_growth_years: int,
    terminal_growth_rate: float,
    net_debt: float,
    shares_out: float | None,
) -> ValuationSummary | None:
    if terminal_growth_rate >= wacc:
        return None

    years = max(1, int(high_growth_years))
    fcff_t = fcff_anchor
    pv_stage1 = 0.0
    for year in range(1, years + 1):
        fcff_t = fcff_t * (1.0 + high_growth_rate)
        pv_stage1 += fcff_t / ((1.0 + wacc) ** year)

    fcff_terminal = fcff_t * (1.0 + terminal_growth_rate)
    terminal_value = fcff_terminal / (wacc - terminal_growth_rate)
    pv_terminal = terminal_value / ((1.0 + wacc) ** years)
    enterprise_value = pv_stage1 + pv_terminal
    equity_value = enterprise_value - net_debt
    per_share = equity_value / shares_out if shares_out and shares_out > 0 else None
    terminal_share = pv_terminal / enterprise_value if enterprise_value else None

    return ValuationSummary(
        enterprise_value=enterprise_value,
        equity_value=equity_value,
        per_share_value=per_share,
        terminal_growth_rate=terminal_growth_rate,
        terminal_growth_rate_effective=terminal_growth_rate,
        present_value_stage1=pv_stage1,
        present_value_terminal=pv_terminal,
        terminal_value_share=terminal_share,
    )


def build_wacc_terminal_growth_sensitivity(
    payload: dict,
    *,
    metric: str = "per_share_value",
    market_price: float | None = None,
    wacc_values: list[float] | None = None,
    terminal_growth_values: list[float] | None = None,
    wacc_range_bps: int = 200,
    wacc_step_bps: int = 100,
    growth_range_bps: int = 100,
    growth_step_bps: int = 50,
) -> SensitivityHeatmapOutput:
    if not isinstance(payload, dict):
        raise TypeError("payload must be a dict")
    if metric not in _METRIC_LABELS:
        raise ValueError(f"Unsupported sensitivity metric: {metric}")

    base_payload = deepcopy(payload)
    base_output = run_valuation(base_payload)
    base_metric_value = _metric_value(base_output.valuation, metric)
    if base_metric_value is None:
        raise ValueError(
            f"Unable to compute sensitivity metric '{metric}' from the base valuation. "
            "Choose a different metric or provide the missing fundamentals."
        )

    base_wacc = _coerce_float(base_output.wacc_inputs.wacc)
    base_growth = _coerce_float(base_output.valuation.terminal_growth_rate)
    if base_wacc is None or base_wacc <= 0:
        raise ValueError("Base valuation did not produce a positive WACC")
    if base_growth is None:
        raise ValueError("Base valuation did not produce a terminal growth rate")
    base_wacc = round(base_wacc, 6)
    base_growth = round(base_growth, 6)

    fundamentals = base_payload.get("fundamentals") or {}
    assumptions = base_payload.get("assumptions") or {}
    net_debt = _coerce_float(fundamentals.get("net_debt")) or 0.0
    shares_out = _coerce_float(fundamentals.get("shares_out"))
    fcff_anchor = _coerce_float(base_output.fcff.anchor)
    if fcff_anchor is None:
        raise ValueError("Base valuation did not produce an FCFF anchor")

    valuation_model = str(base_output.valuation_model or "steady_state_single_stage")
    high_growth_rate = _clip_rate(
        _coerce_float(assumptions.get("stage1_growth_rate") or assumptions.get("high_growth_rate")),
        low=-0.5,
        high=1.0,
    )
    if high_growth_rate is None:
        high_growth_rate = 0.10
    high_growth_years = int(
        _coerce_float(assumptions.get("stage1_years") or assumptions.get("high_growth_years")) or 5
    )

    wacc_axis = (
        sorted({round(float(value), 6) for value in wacc_values})
        if wacc_values is not None
        else _grid_around(
            base_wacc,
            range_bps=wacc_range_bps,
            step_bps=wacc_step_bps,
            floor=0.01,
            ceiling=0.5,
        )
    )
    growth_axis = (
        sorted({round(float(value), 6) for value in terminal_growth_values})
        if terminal_growth_values is not None
        else _grid_around(
            base_growth,
            range_bps=growth_range_bps,
            step_bps=growth_step_bps,
            floor=0.0,
            ceiling=0.15,
        )
    )

    invalid_cells = 0
    matrix: list[list[float | None]] = []
    for wacc in wacc_axis:
        row: list[float | None] = []
        for growth in growth_axis:
            if valuation_model == "two_stage":
                summary = _two_stage_value(
                    fcff_anchor=fcff_anchor,
                    wacc=wacc,
                    high_growth_rate=high_growth_rate,
                    high_growth_years=high_growth_years,
                    terminal_growth_rate=growth,
                    net_debt=net_debt,
                    shares_out=shares_out,
                )
            else:
                summary = _steady_state_value(
                    fcff_anchor=fcff_anchor,
                    wacc=wacc,
                    terminal_growth_rate=growth,
                    net_debt=net_debt,
                    shares_out=shares_out,
                )
            if summary is None:
                invalid_cells += 1
                row.append(None)
            else:
                row.append(_metric_value(summary, metric))
        matrix.append(row)

    diagnostics = list(base_output.diagnostics)
    warnings = list(base_output.warnings)
    diagnostics.append("sensitivity_heatmap:wacc_x_terminal_growth")
    diagnostics.append(f"sensitivity_metric:{metric}")
    if invalid_cells:
        diagnostics.append(f"sensitivity_invalid_cells:{invalid_cells}")

    return SensitivityHeatmapOutput(
        ticker=base_output.ticker,
        market=base_output.market,
        currency=base_output.currency,
        as_of_date=base_output.as_of_date,
        valuation_model=valuation_model,
        metric=metric,
        metric_label=_METRIC_LABELS[metric],
        base_wacc=base_wacc,
        base_terminal_growth_rate=base_growth,
        base_metric_value=base_metric_value,
        market_price=market_price,
        wacc_values=wacc_axis,
        terminal_growth_values=growth_axis,
        matrix=matrix,
        diagnostics=diagnostics,
        warnings=warnings,
    )