文件预览

optimizer.py

查看 YouOS 技能包中的文件内容。

文件内容

app/autoresearch/optimizer.py

"""Autoresearch optimization loop for YouOS."""

from __future__ import annotations

import json
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

from app.autoresearch.mutator import (
    ConfigSurface,
    apply_mutation,
    describe_mutation,
    get_mutable_surfaces,
    revert_mutation,
)
from app.autoresearch.run_log import ensure_table, log_iteration
from app.autoresearch.scorer import (
    Scorecard,
    compare_scorecards,
    draft_quality_case_weights,
    draft_quality_weighting_enabled,
    scorecard_from_eval_result,
)
from app.evaluation.service import EvalRequest, run_eval_suite


@dataclass
class IterationResult:
    iteration: int
    surface_name: str
    mutation_desc: str
    baseline_composite: float
    candidate_composite: float
    outcome: str  # "improved" | "neutral" | "regressed"
    kept: bool


@dataclass
class AutoresearchReport:
    run_tag: str
    started_at: str
    baseline: Scorecard
    final: Scorecard
    iterations: list[IterationResult] = field(default_factory=list)
    total_eval_runs: int = 0
    improvements_kept: int = 0
    reverted: int = 0


def run_autoresearch(
    configs_dir: Path,
    database_url: str,
    *,
    generate_fn: Any,
    max_iterations: int = 10,
    baseline_tag: str = "autoresearch_baseline",
    dry_run: bool = False,
    surface_filter: str | None = None,
) -> AutoresearchReport:
    """Run the autoresearch optimization loop.

    Args:
        configs_dir: Path to configs/ directory.
        database_url: SQLite database URL.
        generate_fn: Generation function matching the eval runner interface.
        max_iterations: Max total eval runs (including baseline).
        baseline_tag: Config tag for the baseline run.
        dry_run: If True, show plan without executing.
        surface_filter: Optional "retrieval" or "prompt_drafting" to limit scope.
    """
    run_tag = f"autoresearch_{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')}"
    started_at = datetime.now(timezone.utc).isoformat()

    surfaces = get_mutable_surfaces(configs_dir, surface_filter=surface_filter)

    if dry_run:
        return _dry_run_report(surfaces, run_tag, started_at)

    # Ensure logging table exists
    ensure_table(database_url)

    # Draft-quality case weights (default off): computed ONCE from the current
    # draft_events log and applied to every scorecard this run, so baseline and
    # candidates stay comparable. Pushes the objective toward the cohorts where
    # real drafts get edited most. No data / disabled → None → equal weighting.
    case_weights: dict[str, float] | None = None
    try:
        if draft_quality_weighting_enabled(configs_dir):
            from app.core.stats import summarize_draft_events

            case_weights = draft_quality_case_weights(summarize_draft_events(database_url)) or None
    except Exception:
        case_weights = None

    # 1. Establish baseline
    baseline_result = run_eval_suite(
        EvalRequest(config_tag=f"{run_tag}_baseline"),
        generate_fn=generate_fn,
        database_url=database_url,
        configs_dir=configs_dir,
        persist=True,
    )
    baseline = scorecard_from_eval_result(baseline_result, configs_dir, case_weights=case_weights)
    eval_count = 1
    current_baseline = baseline

    report = AutoresearchReport(
        run_tag=run_tag,
        started_at=started_at,
        baseline=baseline,
        final=baseline,
    )

    # 2. Iterate over surfaces
    for surface in surfaces:
        if eval_count >= max_iterations:
            break

        mutation_desc = describe_mutation(surface)

        # Skip if at boundary
        if "at boundary" in mutation_desc:
            continue

        old_value = apply_mutation(surface, configs_dir)
        if old_value == surface.current_value:
            # No actual change (boundary)
            continue

        # Run eval with mutated config
        candidate_result = run_eval_suite(
            EvalRequest(config_tag=f"{run_tag}_iter{eval_count}"),
            generate_fn=generate_fn,
            database_url=database_url,
            configs_dir=configs_dir,
            persist=True,
        )
        eval_count += 1
        candidate = scorecard_from_eval_result(candidate_result, configs_dir, case_weights=case_weights)
        outcome = compare_scorecards(current_baseline, candidate)

        kept = outcome == "improved"
        if not kept:
            revert_mutation(surface, old_value, configs_dir)

        iteration = IterationResult(
            iteration=eval_count - 1,
            surface_name=surface.name,
            mutation_desc=mutation_desc,
            baseline_composite=current_baseline.composite,
            candidate_composite=candidate.composite,
            outcome=outcome,
            kept=kept,
        )
        report.iterations.append(iteration)

        log_iteration(
            database_url,
            run_tag=run_tag,
            iteration=eval_count - 1,
            surface_name=surface.name,
            mutation_desc=mutation_desc,
            baseline_composite=current_baseline.composite,
            candidate_composite=candidate.composite,
            outcome=outcome,
            kept=kept,
        )

        if kept:
            current_baseline = candidate
            report.improvements_kept += 1
        else:
            report.reverted += 1

    report.total_eval_runs = eval_count
    report.final = current_baseline

    # Write structured JSON run log
    _write_jsonl_entry(report, configs_dir)

    return report


def _write_jsonl_entry(report: AutoresearchReport, configs_dir: Path) -> None:
    """Append a JSON line to var/autoresearch_runs.jsonl."""
    root = configs_dir.parent
    jsonl_path = root / "var" / "autoresearch_runs.jsonl"
    jsonl_path.parent.mkdir(parents=True, exist_ok=True)

    improvements = [it.surface_name for it in report.iterations if it.kept]
    regressions = [it.surface_name for it in report.iterations if it.outcome == "regressed"]

    entry = {
        "run_at": report.started_at,
        "iterations": report.total_eval_runs,
        "composite_score": report.final.composite,
        "improvements": improvements,
        "regressions": regressions,
        "config_snapshot": {
            "baseline_composite": report.baseline.composite,
            "final_composite": report.final.composite,
            "improvements_kept": report.improvements_kept,
            "reverted": report.reverted,
        },
    }
    with open(jsonl_path, "a", encoding="utf-8") as f:
        f.write(json.dumps(entry, default=str) + "\n")


def _dry_run_report(
    surfaces: list[ConfigSurface],
    run_tag: str,
    started_at: str,
) -> AutoresearchReport:
    """Build a report showing what would be mutated without doing it."""
    dummy = Scorecard(
        config_tag="dry_run",
        pass_rate=0.0,
        warn_rate=0.0,
        fail_rate=0.0,
        avg_keyword_hit=0.0,
        avg_confidence=0.0,
        composite=0.0,
    )
    report = AutoresearchReport(
        run_tag=run_tag,
        started_at=started_at,
        baseline=dummy,
        final=dummy,
    )
    for surface in surfaces:
        desc = describe_mutation(surface)
        report.iterations.append(
            IterationResult(
                iteration=0,
                surface_name=surface.name,
                mutation_desc=desc,
                baseline_composite=0.0,
                candidate_composite=0.0,
                outcome="dry_run",
                kept=False,
            )
        )
    return report


def format_report(report: AutoresearchReport) -> str:
    """Format an autoresearch report for terminal output."""
    lines: list[str] = []
    lines.append(f"YouOS Autoresearch — {report.started_at}")
    lines.append("━" * 50)

    if report.baseline.composite > 0 or report.final.composite > 0:
        lines.append(f"Baseline: {report.baseline.summary()}")
        lines.append("")

    for it in report.iterations:
        prefix = f"[{it.iteration}/{report.total_eval_runs or len(report.iterations)}]"
        if it.outcome == "dry_run":
            lines.append(f"  {it.mutation_desc}")
        elif it.outcome == "improved":
            lines.append(f"{prefix} Mutating {it.mutation_desc}\n  Improved: composite {it.baseline_composite:.2f} -> {it.candidate_composite:.2f} — keeping")
        elif it.outcome == "neutral":
            lines.append(f"{prefix} Mutating {it.mutation_desc}\n  Neutral: composite {it.baseline_composite:.2f} -> {it.candidate_composite:.2f} — reverting")
        else:
            lines.append(
                f"{prefix} Mutating {it.mutation_desc}\n  Regressed: composite {it.baseline_composite:.2f} -> {it.candidate_composite:.2f} — reverting"
            )

    lines.append("━" * 50)

    if report.total_eval_runs > 0:
        lines.append(f"Final: {report.final.summary()}")
        lines.append(f"Improvements kept: {report.improvements_kept} | Reverted: {report.reverted} | Iterations: {report.total_eval_runs}")
    else:
        lines.append(f"Dry run: {len(report.iterations)} surfaces would be mutated")

    return "\n".join(lines)