文件预览

report.py

查看 Founder Signal 技能包中的文件内容。

文件内容

src/founder_signal/report.py

"""Run report rendering for Founder Signal."""

from __future__ import annotations

import json
from pathlib import Path
from typing import Any, Mapping

from .review_output import human_review_message, human_review_ready, review_url

REPORT_FILENAME = "REPORT.md"
FAILED_FILENAME = "FAILED.md"
_CANDIDATES_FILENAME = "candidates.json"
_OUTPUTS_CANDIDATES_FILENAME = "outputs/candidates.json"
_SELECTED_CANDIDATE_FILENAME = "selected-candidate.json"
_DAILY_REVIEW_FILENAME = "daily-review.md"
_PUBLIC_RUN_REVIEW_FILENAME = "public-run-review.md"
_SAFE = "safe_to_score_from_this_runtime"
_UNSAFE = "unsafe_to_score_from_this_runtime"


def write_report(*, run_dir: Path, artifact: Mapping[str, Any]) -> Path:
    report_path = run_dir / REPORT_FILENAME
    report_path.write_text(render_report(run_dir=run_dir, artifact=artifact), encoding="utf-8")
    return report_path


def write_failed_marker(*, run_dir: Path, artifact: Mapping[str, Any]) -> Path:
    failed_path = run_dir / FAILED_FILENAME
    failed_path.write_text(render_failed_marker(run_dir=run_dir, artifact=artifact), encoding="utf-8")
    return failed_path


def render_report(*, run_dir: Path, artifact: Mapping[str, Any]) -> str:
    if _is_aggregate_run(artifact):
        return _render_aggregate_report(run_dir=run_dir, artifact=artifact)
    return _render_profile_report(run_dir=run_dir, artifact=artifact)


def render_failed_marker(*, run_dir: Path, artifact: Mapping[str, Any]) -> str:
    lines = [
        "# Founder Signal Run Failed",
        "",
        f"- Run ID: {artifact.get('run_id') or run_dir.name}",
        f"- Run directory: {run_dir}",
        f"- Status: {_report_status(artifact)}",
    ]
    profile_id = str(artifact.get("profile_id") or "").strip()
    if profile_id:
        lines.append(f"- Profile ID: {profile_id}")
    product_name = str(artifact.get("product_name") or "").strip()
    if product_name:
        lines.append(f"- Product: {product_name}")
    daily_review_path = artifact.get("daily_review_path")
    if daily_review_path:
        lines.append(f"- Daily review: {daily_review_path}")
    public_run_review_path = artifact.get("public_run_review_path")
    if public_run_review_path:
        lines.append(f"- Public run review: {public_run_review_path}")
    error = artifact.get("error")
    if error:
        lines.append(f"- Error: {error}")
    failure_message = artifact.get("failure_message")
    if failure_message:
        lines.append(f"- Failure: {failure_message}")
    log_file = artifact.get("log_file")
    if log_file:
        lines.append(f"- Log file: {log_file}")
    stdout = str(artifact.get("publish_stdout") or "").rstrip()
    stderr = str(artifact.get("publish_stderr") or "").rstrip()
    if stdout:
        lines.extend(["", "## stdout", "", "```", stdout, "```"])
    if stderr:
        lines.extend(["", "## stderr", "", "```", stderr, "```"])
    return "\n".join(lines).rstrip() + "\n"


def _render_profile_report(*, run_dir: Path, artifact: Mapping[str, Any]) -> str:
    selected_candidate = _selected_candidate_payload(run_dir=run_dir, artifact=artifact)
    values = [
        ("Run ID", str(artifact.get("run_id") or run_dir.name)),
        ("Run datetime", str(artifact.get("created_at") or "")),
        ("Status", _report_status(artifact)),
        ("Candidates found", str(_candidates_found(run_dir=run_dir, artifact=artifact))),
        ("Platform candidate counts", _multiline_value(_platform_counts(run_dir=run_dir, artifact=artifact))),
        ("Candidates verified", str(_candidates_verified(run_dir=run_dir, artifact=artifact))),
        ("Selected candidate", _selected_candidate(run_dir=run_dir, artifact=artifact)),
        ("Discovery metrics", _multiline_value(_discovery_metrics_lines(artifact))),
        (
            "Selected candidate quality",
            _multiline_value(_selected_candidate_quality_lines(selected_candidate)),
        ),
        (
            "Skipped verified candidate decisions",
            _multiline_value(
                _skipped_verified_candidate_lines(
                    run_dir=run_dir,
                    artifact=artifact,
                    selected_candidate=selected_candidate,
                )
            ),
        ),
        ("Evidence files", _multiline_value(_evidence_files(run_dir=run_dir, artifact=artifact))),
        (
            "Action card generated",
            _yes_no(
                artifact.get("action_card_generated"),
                fallback=(run_dir / _DAILY_REVIEW_FILENAME).exists(),
            ),
        ),
        (
            "Public run review generated",
            _yes_no(
                artifact.get("public_run_review_generated"),
                fallback=(run_dir / _PUBLIC_RUN_REVIEW_FILENAME).exists(),
            ),
        ),
        ("Action card generation mode", str(artifact.get("action_card_generation_mode") or "")),
        (
            "Draft public page published",
            _yes_no(
                artifact.get("draft_public_publish_succeeded"),
                fallback=bool(_draft_url(run_dir=run_dir, artifact=artifact)),
            ),
        ),
        (
            "Draft publish requested",
            _yes_no(
                artifact.get("draft_publish_requested"),
                fallback=bool(artifact.get("draft_publish_intent_path")),
            ),
        ),
        (
            "Draft review-page confirmation required",
            _yes_no(artifact.get("draft_publish_requires_confirmation"), fallback=False),
        ),
        (
            "Draft public publish confirmation required",
            _yes_no(artifact.get("draft_public_publish_requires_confirmation"), fallback=False),
        ),
        ("Draft publish intent", str(artifact.get("draft_publish_intent_path") or "")),
        ("Draft publish artifact", str(artifact.get("draft_publish_artifact_path") or "")),
        ("Draft public URL", _draft_url(run_dir=run_dir, artifact=artifact)),
        (
            "Human review ready",
            _yes_no(artifact.get("human_review_ready"), fallback=human_review_ready(artifact)),
        ),
        (
            "Human review message",
            str(artifact.get("human_review_message") or human_review_message(artifact)),
        ),
        (
            "Failures / rejected candidates",
            _multiline_value(_failures_and_rejections(run_dir=run_dir, artifact=artifact)),
        ),
        ("Safety decision", _safety_decision(artifact)),
    ]

    lines = ["# Founder Signal Run Report", ""]
    profile_id = str(artifact.get("profile_id") or "").strip()
    if profile_id:
        lines.extend(["## Profile ID", "", profile_id, ""])
    product_name = str(artifact.get("product_name") or "").strip()
    if product_name:
        lines.extend(["## Product", "", product_name, ""])
    for heading, value in values:
        lines.extend([f"## {heading}", "", value or "None", ""])
    return "\n".join(lines).rstrip() + "\n"


def _render_aggregate_report(*, run_dir: Path, artifact: Mapping[str, Any]) -> str:
    profile_results = artifact.get("profile_results")
    if not isinstance(profile_results, list):
        profile_results = []
    values = [
        ("Run ID", str(artifact.get("run_id") or run_dir.name)),
        ("Run datetime", str(artifact.get("created_at") or "")),
        ("Status", _report_status(artifact)),
        ("Profiles requested", _multiline_value(_profile_ids(profile_results))),
        ("Profiles run", _multiline_value(_profile_ids(profile_results))),
        ("Profiles skipped", _multiline_value(_skipped_profiles(artifact))),
        ("Candidates found", _multiline_value(_aggregate_candidates(profile_results, "candidates_found"))),
        ("Platform candidate counts", _multiline_value(_aggregate_platform_counts(profile_results))),
        (
            "Candidates verified",
            _multiline_value(_aggregate_candidates(profile_results, "candidates_verified")),
        ),
        (
            "Discovery metrics",
            _multiline_value(_aggregate_discovery_metrics(profile_results, artifact)),
        ),
        ("Selected candidate", _multiline_value(_aggregate_selected(profile_results))),
        ("Evidence files", _multiline_value(_aggregate_evidence(profile_results))),
        (
            "Action card generated",
            _multiline_value(_aggregate_yes_no(profile_results, "action_card_generated")),
        ),
        (
            "Public run review generated",
            _multiline_value(_aggregate_yes_no(profile_results, "public_run_review_generated")),
        ),
        (
            "Action card generation mode",
            _multiline_value(_aggregate_generation_modes(profile_results)),
        ),
        (
            "Draft public page published",
            _multiline_value(_aggregate_yes_no(profile_results, "draft_public_publish_succeeded")),
        ),
        (
            "Draft publish requested",
            _multiline_value(_aggregate_yes_no(profile_results, "draft_publish_requested")),
        ),
        (
            "Draft review-page confirmation required",
            _multiline_value(
                _aggregate_yes_no(profile_results, "draft_publish_requires_confirmation")
            ),
        ),
        (
            "Draft public publish confirmation required",
            _multiline_value(
                _aggregate_yes_no(profile_results, "draft_public_publish_requires_confirmation")
            ),
        ),
        ("Draft publish intent", _multiline_value(_aggregate_intents(profile_results))),
        (
            "Draft publish artifact",
            _multiline_value(_aggregate_artifacts(profile_results)),
        ),
        ("Draft public URL", _multiline_value(_aggregate_urls(profile_results))),
        (
            "Human review ready",
            _multiline_value(_aggregate_yes_no(profile_results, "human_review_ready")),
        ),
        (
            "Human review message",
            _multiline_value(_aggregate_review_messages(profile_results)),
        ),
        (
            "Failures / rejected candidates",
            _multiline_value(_aggregate_failures(profile_results, artifact)),
        ),
        ("Safety decision", _safety_decision(artifact)),
    ]

    lines = ["# Founder Signal Run Report", ""]
    for heading, value in values:
        lines.extend([f"## {heading}", "", value or "None", ""])
    return "\n".join(lines).rstrip() + "\n"


def _is_aggregate_run(artifact: Mapping[str, Any]) -> bool:
    return isinstance(artifact.get("profile_results"), list)


def _profile_ids(profile_results: list[Any]) -> list[str]:
    values: list[str] = []
    for item in profile_results:
        if not isinstance(item, Mapping):
            continue
        profile_id = str(item.get("profile_id") or "").strip()
        if profile_id:
            values.append(profile_id)
    return values


def _skipped_profiles(artifact: Mapping[str, Any]) -> list[str]:
    skipped = artifact.get("profiles_skipped")
    if not isinstance(skipped, list):
        return []
    return [str(item) for item in skipped if str(item).strip()]


def _aggregate_candidates(profile_results: list[Any], key: str) -> list[str]:
    items: list[str] = []
    for item in profile_results:
        if not isinstance(item, Mapping):
            continue
        profile_id = str(item.get("profile_id") or "").strip() or "unknown"
        items.append(f"{profile_id}: {int(item.get(key) or 0)}")
    return items


def _aggregate_platform_counts(profile_results: list[Any]) -> list[str]:
    items: list[str] = []
    for item in profile_results:
        if not isinstance(item, Mapping):
            continue
        profile_id = str(item.get("profile_id") or "").strip() or "unknown"
        counts = item.get("platform_candidate_counts")
        if not isinstance(counts, Mapping):
            continue
        for platform, count in sorted(counts.items()):
            items.append(f"{profile_id}: {platform}: {int(count or 0)}")
    return items


def _aggregate_selected(profile_results: list[Any]) -> list[str]:
    items: list[str] = []
    for item in profile_results:
        if not isinstance(item, Mapping):
            continue
        profile_id = str(item.get("profile_id") or "").strip() or "unknown"
        selected = item.get("selected_candidate")
        if isinstance(selected, Mapping):
            label = _format_candidate(selected)
        else:
            label = str(selected or "None")
        items.append(f"{profile_id}: {label}")
    return items


def _aggregate_evidence(profile_results: list[Any]) -> list[str]:
    items: list[str] = []
    for item in profile_results:
        if not isinstance(item, Mapping):
            continue
        profile_id = str(item.get("profile_id") or "").strip() or "unknown"
        for evidence_file in item.get("evidence_files") or []:
            items.append(f"{profile_id}: {evidence_file}")
    return items


def _aggregate_generation_modes(profile_results: list[Any]) -> list[str]:
    items: list[str] = []
    for item in profile_results:
        if not isinstance(item, Mapping):
            continue
        profile_id = str(item.get("profile_id") or "").strip() or "unknown"
        mode = str(item.get("action_card_generation_mode") or "None").strip() or "None"
        items.append(f"{profile_id}: {mode}")
    return items


def _aggregate_yes_no(profile_results: list[Any], key: str) -> list[str]:
    items: list[str] = []
    for item in profile_results:
        if not isinstance(item, Mapping):
            continue
        profile_id = str(item.get("profile_id") or "").strip() or "unknown"
        items.append(f"{profile_id}: {_yes_no(item.get(key), fallback=False)}")
    return items


def _aggregate_urls(profile_results: list[Any]) -> list[str]:
    items: list[str] = []
    for item in profile_results:
        if not isinstance(item, Mapping):
            continue
        profile_id = str(item.get("profile_id") or "").strip() or "unknown"
        draft_url = review_url(item) or "None"
        items.append(f"{profile_id}: {draft_url}")
    return items


def _aggregate_review_messages(profile_results: list[Any]) -> list[str]:
    items: list[str] = []
    for item in profile_results:
        if not isinstance(item, Mapping):
            continue
        profile_id = str(item.get("profile_id") or "").strip() or "unknown"
        message = str(item.get("human_review_message") or human_review_message(item)).strip() or "None"
        items.append(f"{profile_id}: {message}")
    return items


def _aggregate_intents(profile_results: list[Any]) -> list[str]:
    items: list[str] = []
    for item in profile_results:
        if not isinstance(item, Mapping):
            continue
        profile_id = str(item.get("profile_id") or "").strip() or "unknown"
        intent_path = str(item.get("draft_publish_intent_path") or "").strip() or "None"
        items.append(f"{profile_id}: {intent_path}")
    return items


def _aggregate_failures(profile_results: list[Any], artifact: Mapping[str, Any]) -> list[str]:
    items: list[str] = []
    for item in profile_results:
        if not isinstance(item, Mapping):
            continue
        profile_id = str(item.get("profile_id") or "").strip() or "unknown"
        for failure in _failures_and_rejections(run_dir=Path("."), artifact=item):
            items.append(f"{profile_id}: {failure}")
    for failure in artifact.get("failures") or []:
        failure_text = str(failure).strip()
        if failure_text:
            items.append(failure_text)
    return items


def _aggregate_artifacts(profile_results: list[Any]) -> list[str]:
    items: list[str] = []
    for item in profile_results:
        if not isinstance(item, Mapping):
            continue
        profile_id = str(item.get("profile_id") or "").strip() or "unknown"
        artifact_path = str(item.get("draft_publish_artifact_path") or "").strip() or "None"
        items.append(f"{profile_id}: {artifact_path}")
    return items


def _aggregate_discovery_metrics(profile_results: list[Any], artifact: Mapping[str, Any]) -> list[str]:
    lines = [
        f"total searched pages: {int(_discovery_metrics(artifact).get('searched_pages') or 0)}",
        f"total searched URLs: {len(_discovery_urls(_discovery_metrics(artifact)))}",
        f"total fresh candidates found: {int(_discovery_metrics(artifact).get('fresh_candidates_found') or 0)}",
        f"total excluded by history: {int(_discovery_metrics(artifact).get('excluded_by_history') or 0)}",
        f"total excluded by profile: {int(_discovery_metrics(artifact).get('excluded_by_profile') or 0)}",
        f"total excluded overall: {int(_discovery_metrics(artifact).get('total_excluded') or 0)}",
        f"total discovery budget pages: {int(_discovery_metrics(artifact).get('discovery_budget_pages') or 0)}",
        f"any profile exhausted discovery: {_yes_no(_discovery_metrics(artifact).get('discovery_exhausted'), fallback=False)}",
    ]
    for item in profile_results:
        if not isinstance(item, Mapping):
            continue
        profile_id = str(item.get("profile_id") or "").strip() or "unknown"
        metrics = _discovery_metrics(item)
        lines.append(
            (
                f"{profile_id}: pages={int(metrics.get('searched_pages') or 0)}, "
                f"urls={len(_discovery_urls(metrics))}, "
                f"fresh={int(metrics.get('fresh_candidates_found') or 0)}, "
                f"excluded={int(metrics.get('total_excluded') or 0)}, "
                f"budget={int(metrics.get('discovery_budget_pages') or 0)}, "
                f"exhausted={_yes_no(metrics.get('discovery_exhausted'), fallback=False)}"
            )
        )
    return lines


def _report_status(artifact: Mapping[str, Any]) -> str:
    status = str(artifact.get("status") or "").strip().lower()
    return "failed" if "fail" in status or status == "error" else "success"


def _candidates_found(*, run_dir: Path, artifact: Mapping[str, Any]) -> int:
    if artifact.get("candidates_found") is not None:
        return int(artifact["candidates_found"])
    candidates = _load_candidates(run_dir)
    return len(candidates) if isinstance(candidates, list) else 0


def _candidates_verified(*, run_dir: Path, artifact: Mapping[str, Any]) -> int:
    if artifact.get("candidates_verified") is not None:
        return int(artifact["candidates_verified"])
    candidates = _load_candidates(run_dir)
    if not isinstance(candidates, list):
        return 0
    verified = 0
    for candidate in candidates:
        if not isinstance(candidate, Mapping):
            continue
        if (
            candidate.get("verified") is True
            or str(candidate.get("status") or "").lower() == "verified"
            or str(candidate.get("read_status") or "").lower() == "verified_read_via_mirror"
        ):
            verified += 1
    return verified


def _platform_counts(*, run_dir: Path, artifact: Mapping[str, Any]) -> list[str]:
    counts = artifact.get("platform_candidate_counts")
    if not isinstance(counts, Mapping):
        counts = {}
        candidates = _load_candidates(run_dir)
        if isinstance(candidates, list):
            for candidate in candidates:
                if not isinstance(candidate, Mapping):
                    continue
                platform = str(candidate.get("platform") or candidate.get("source_platform") or "reddit").strip().lower() or "reddit"
                counts[platform] = int(counts.get(platform, 0)) + 1
    return [f"{platform}: {int(count or 0)}" for platform, count in sorted(counts.items())]


def _selected_candidate(*, run_dir: Path, artifact: Mapping[str, Any]) -> str:
    selected = _selected_candidate_payload(run_dir=run_dir, artifact=artifact)
    if isinstance(selected, Mapping):
        return _format_candidate(selected)
    if isinstance(selected, str) and selected.strip():
        return selected.strip()
    selected_value = artifact.get("selected_candidate")
    if isinstance(selected_value, str) and selected_value.strip():
        return selected_value.strip()
    return "None"


def _selected_candidate_payload(*, run_dir: Path, artifact: Mapping[str, Any]) -> Mapping[str, Any] | None:
    selected = artifact.get("selected_candidate")
    if isinstance(selected, Mapping):
        return selected
    selected_file = _load_json(run_dir / _SELECTED_CANDIDATE_FILENAME)
    if isinstance(selected_file, Mapping):
        return selected_file
    return None


def _evidence_files(*, run_dir: Path, artifact: Mapping[str, Any]) -> list[str]:
    evidence_files = artifact.get("evidence_files")
    if isinstance(evidence_files, list):
        return [str(item) for item in evidence_files if str(item).strip()]
    return []


def _draft_url(*, run_dir: Path, artifact: Mapping[str, Any]) -> str:
    draft_url = str(artifact.get("review_url") or "").strip()
    if draft_url:
        return draft_url
    draft_url = review_url(artifact)
    if draft_url:
        return draft_url
    return "None"


def _failures_and_rejections(*, run_dir: Path, artifact: Mapping[str, Any]) -> list[str]:
    items: list[str] = []
    failures = artifact.get("failures")
    if isinstance(failures, list):
        items.extend(str(item) for item in failures if str(item).strip())
    rejected = artifact.get("rejected_candidates")
    if isinstance(rejected, list):
        items.extend(str(item) for item in rejected if str(item).strip())
    error = str(artifact.get("error") or "").strip()
    if error:
        items.append(error)
    daily_review_path = str(artifact.get("daily_review_path") or "").strip()
    if daily_review_path:
        items.append(f"Preserved daily review: {daily_review_path}")
    public_run_review_path = str(artifact.get("public_run_review_path") or "").strip()
    if public_run_review_path:
        items.append(f"Public run review: {public_run_review_path}")
    deduped: list[str] = []
    seen: set[str] = set()
    for item in items:
        if item in seen:
            continue
        seen.add(item)
        deduped.append(item)
    return deduped


def _discovery_metrics(artifact: Mapping[str, Any]) -> Mapping[str, Any]:
    metrics = artifact.get("discovery_metrics")
    if isinstance(metrics, Mapping):
        return metrics
    return {}


def _discovery_urls(metrics: Mapping[str, Any]) -> list[str]:
    urls = metrics.get("searched_urls")
    if not isinstance(urls, list):
        return []
    return [str(item) for item in urls if str(item).strip()]


def _discovery_metrics_lines(artifact: Mapping[str, Any]) -> list[str]:
    metrics = _discovery_metrics(artifact)
    if not metrics:
        return []
    urls = _discovery_urls(metrics)
    return [
        f"searched pages: {int(metrics.get('searched_pages') or 0)}",
        f"searched URLs: {len(urls)}",
        f"fresh candidates found: {int(metrics.get('fresh_candidates_found') or 0)}",
        f"excluded by history: {int(metrics.get('excluded_by_history') or 0)}",
        f"excluded by profile: {int(metrics.get('excluded_by_profile') or 0)}",
        f"total excluded: {int(metrics.get('total_excluded') or 0)}",
        f"discovery budget pages: {int(metrics.get('discovery_budget_pages') or 0)}",
        f"discovery exhausted: {_yes_no(metrics.get('discovery_exhausted'), fallback=False)}",
    ]


def _selected_candidate_quality_lines(selected_candidate: Mapping[str, Any] | None) -> list[str]:
    if not isinstance(selected_candidate, Mapping):
        return []
    structured = selected_candidate.get("structured_evidence")
    score_breakdown = selected_candidate.get("score_breakdown")
    agent_review = selected_candidate.get("agent_review")
    lines: list[str] = []
    extraction_quality = ""
    if isinstance(structured, Mapping):
        extraction_quality = str(structured.get("extraction_quality") or "").strip()
    if extraction_quality:
        lines.append(f"structured extraction quality: {extraction_quality}")
    if isinstance(structured, Mapping) and structured.get("post_age_days") is not None:
        lines.append(f"post age days: {int(structured.get('post_age_days') or 0)}")
    relevance_line = _deterministic_relevance_line(score_breakdown)
    if relevance_line:
        lines.append(relevance_line)
    if isinstance(agent_review, Mapping):
        score_total = int(selected_candidate.get("agent_review_score_total") or 0)
        lines.append(f"agent review score: {score_total}/25")
        lines.extend(_agent_review_lines(agent_review))
    selection_eligible = selected_candidate.get("selection_eligible")
    if selection_eligible is not None:
        lines.append(
            f"selection eligible: {_yes_no(selection_eligible, fallback=False)}"
        )
    gate_reason = str(selected_candidate.get("selection_gate_reason") or "").strip()
    lines.append(f"selection gate reason: {gate_reason or 'passed'}")
    rejection_signals = _string_list(selected_candidate.get("rejection_signals"))
    lines.append(
        "rejection signals: "
        + (", ".join(rejection_signals) if rejection_signals else "none")
    )
    return lines


def _skipped_verified_candidate_lines(
    *,
    run_dir: Path,
    artifact: Mapping[str, Any],
    selected_candidate: Mapping[str, Any] | None,
) -> list[str]:
    selected_candidate_id = ""
    if isinstance(selected_candidate, Mapping):
        selected_candidate_id = str(selected_candidate.get("candidate_id") or "").strip()
    items: list[str] = []
    candidates = _load_candidates(run_dir)
    if not isinstance(candidates, list):
        return items
    for candidate in candidates:
        if not isinstance(candidate, Mapping):
            continue
        if not _is_verified_candidate(candidate):
            continue
        candidate_id = str(candidate.get("candidate_id") or "").strip()
        if candidate_id and candidate_id == selected_candidate_id:
            continue
        reasons = _candidate_skip_reasons(candidate)
        if not reasons:
            continue
        label = _format_candidate(candidate)
        items.append(f"{label}: {'; '.join(reasons)}")
    return items


def _is_verified_candidate(candidate: Mapping[str, Any]) -> bool:
    if candidate.get("verified") is True:
        return True
    if str(candidate.get("status") or "").strip().lower() == "verified":
        return True
    read_status = str(candidate.get("read_status") or "").strip().lower()
    if read_status.startswith("verified"):
        return True
    return False


def _candidate_skip_reasons(candidate: Mapping[str, Any]) -> list[str]:
    reasons: list[str] = []
    gate_reason = str(candidate.get("selection_gate_reason") or "").strip()
    if gate_reason:
        reasons.append(f"selection gate: {_humanize_key(gate_reason)}")
    rejection_signals = _string_list(candidate.get("rejection_signals"))
    if rejection_signals:
        reasons.append(f"rejection signals: {', '.join(rejection_signals)}")
    agent_review = candidate.get("agent_review")
    if isinstance(agent_review, Mapping):
        reject_reason = str(agent_review.get("reject_reason") or "").strip()
        if reject_reason:
            reasons.append(f"agent reject reason: {reject_reason}")
        reason = str(agent_review.get("reason") or "").strip()
        if reason and gate_reason:
            reasons.append(f"agent review reason: {reason}")
    return reasons


def _deterministic_relevance_line(score_breakdown: Any) -> str:
    if not isinstance(score_breakdown, Mapping):
        return ""
    title_relevance = score_breakdown.get("title_relevance")
    body_relevance = score_breakdown.get("body_relevance")
    relevance_total = score_breakdown.get("relevance_to_draft")
    if title_relevance is not None or body_relevance is not None:
        title_value = int(title_relevance or 0)
        body_value = int(body_relevance or 0)
        if relevance_total is not None:
            return (
                "deterministic title/body relevance: "
                f"title {title_value}, body {body_value}, total {int(relevance_total or 0)}"
            )
        return f"deterministic title/body relevance: title {title_value}, body {body_value}"
    if relevance_total is not None:
        return f"deterministic title/body relevance: overall {int(relevance_total or 0)}"
    return ""


def _agent_review_lines(agent_review: Mapping[str, Any]) -> list[str]:
    metrics = []
    for key in (
        "profile_fit",
        "audience_match",
        "pain_relevance",
        "reply_opportunity",
        "confidence",
    ):
        if key in agent_review:
            metrics.append(f"{_humanize_key(key)} {int(agent_review.get(key) or 0)}")
    lines: list[str] = []
    if metrics:
        lines.append("agent review components: " + ", ".join(metrics))
    reason = str(agent_review.get("reason") or "").strip()
    if reason:
        lines.append(f"agent review reason: {reason}")
    reject_reason = str(agent_review.get("reject_reason") or "").strip()
    if reject_reason:
        lines.append(f"agent reject reason: {reject_reason}")
    return lines


def _string_list(value: Any) -> list[str]:
    if not isinstance(value, list):
        return []
    return [str(item) for item in value if str(item).strip()]


def _humanize_key(value: str) -> str:
    return value.replace("_", " ").strip()


def _safety_decision(artifact: Mapping[str, Any]) -> str:
    value = str(artifact.get("safety_decision") or "").strip()
    if value in {_SAFE, _UNSAFE}:
        return value
    return _UNSAFE


def _yes_no(value: Any, *, fallback: bool) -> str:
    if value is None:
        return "yes" if fallback else "no"
    return "yes" if bool(value) else "no"


def _multiline_value(items: list[str]) -> str:
    if not items:
        return "None"
    return "\n".join(f"- {item}" for item in items)


def _format_candidate(candidate: Mapping[str, Any]) -> str:
    for key in ("candidate_id", "id", "username", "title", "url"):
        value = str(candidate.get(key) or "").strip()
        if value:
            return value
    return json.dumps(dict(candidate), sort_keys=True)


def _load_json(path: Path) -> Any:
    if not path.exists():
        return None
    try:
        return json.loads(path.read_text(encoding="utf-8"))
    except json.JSONDecodeError:
        return None


def _load_candidates(run_dir: Path) -> Any:
    candidates = _load_json(run_dir / _OUTPUTS_CANDIDATES_FILENAME)
    if candidates is not None:
        return candidates
    return _load_json(run_dir / _CANDIDATES_FILENAME)