文件预览

stats.py

查看 YouOS 技能包中的文件内容。

文件内容

app/core/stats.py

"""Unified stats query layer for YouOS."""

from __future__ import annotations

import json
import os
import sqlite3
from pathlib import Path

ROOT_DIR = Path(__file__).resolve().parents[2]


def _get_var_path(filename: str) -> Path:
    from app.core.settings import get_var_dir

    return get_var_dir() / filename


def _resolve_adapter_path() -> Path:
    from app.core.settings import get_adapter_path

    return get_adapter_path()


ADAPTER_PATH = _resolve_adapter_path()
AUTORESEARCH_JSONL = _get_var_path("autoresearch_runs.jsonl")
AUTORESEARCH_LOG = _get_var_path("autoresearch_log.md")


def _safe_count(conn: sqlite3.Connection, table: str) -> int:
    try:
        return conn.execute(f"SELECT COUNT(*) FROM {table}").fetchone()[0]  # noqa: S608
    except sqlite3.OperationalError:
        return 0


def _group_counts(conn: sqlite3.Connection, column: str, *, default: str) -> dict[str, int]:
    """COUNT(*) grouped by a draft_events column (NULLs folded to *default*)."""
    try:
        rows = conn.execute(
            f"SELECT COALESCE({column}, ?) AS k, COUNT(*) AS n FROM draft_events GROUP BY k ORDER BY n DESC",  # noqa: S608
            (default,),
        ).fetchall()
        return {str(r["k"]): int(r["n"]) for r in rows}
    except sqlite3.OperationalError:
        return {}


def summarize_draft_events(database_url: str) -> dict:
    """Aggregate the ``draft_events`` signal log into a draft-quality picture.

    Every generated draft is logged with the *conditions* it was produced
    under (intent, sender_type, confidence, length_flag). This summarizes them
    so the loop can see *where* drafting is weak — e.g. an intent whose drafts
    are frequently off the target length, or a cohort that draws mostly
    low-confidence retrieval. Where a draft can be matched to an edit outcome
    (a best-effort join to ``draft_history`` on inbound+draft text), it also
    reports the average edit distance by condition. The model's own drafts are
    never training targets; this is analysis/observability for the loop.
    """
    from app.db.bootstrap import resolve_sqlite_path

    empty = {
        "total": 0,
        "by_intent": {},
        "by_sender_type": {},
        "by_confidence": {},
        "by_length_flag": {},
        "by_model": {},
        "off_target_pct": None,
        "outcome": {"matched": 0, "avg_edit_distance_by_sender_type": {}, "avg_edit_distance_by_confidence": {}},
    }

    db_path = resolve_sqlite_path(database_url)
    if not db_path.exists():
        return empty

    conn = sqlite3.connect(db_path)
    conn.row_factory = sqlite3.Row
    try:
        total = _safe_count(conn, "draft_events")
        if total == 0:
            return empty

        summary = {
            "total": total,
            "by_intent": _group_counts(conn, "intent", default="unknown"),
            "by_sender_type": _group_counts(conn, "sender_type", default="unknown"),
            "by_confidence": _group_counts(conn, "confidence", default="unknown"),
            "by_length_flag": _group_counts(conn, "length_flag", default="none"),
            # Which model actually produced each draft — the source of truth for
            # whether the LoRA adapter is really in use vs. a silent base/cloud
            # fallback. See get_drafting_model_status().
            "by_model": _group_counts(conn, "model_used", default="unknown"),
            "off_target_pct": None,
            "outcome": {"matched": 0, "avg_edit_distance_by_sender_type": {}, "avg_edit_distance_by_confidence": {}},
        }

        # Fraction of length-annotated drafts that missed the target band.
        try:
            off, flagged = conn.execute(
                """SELECT
                       SUM(CASE WHEN length_flag IN ('long', 'short') THEN 1 ELSE 0 END),
                       SUM(CASE WHEN length_flag IS NOT NULL THEN 1 ELSE 0 END)
                   FROM draft_events"""
            ).fetchone()
            if flagged:
                summary["off_target_pct"] = round(100.0 * (off or 0) / flagged, 1)
        except sqlite3.OperationalError:
            pass

        # Best-effort outcome correlation: join to draft_history on the only
        # available linkage (inbound + draft text). Not unique — same draft can
        # recur — so this is indicative, not exact; `matched` reports coverage.
        for key, col in (("avg_edit_distance_by_sender_type", "sender_type"), ("avg_edit_distance_by_confidence", "confidence")):
            try:
                rows = conn.execute(
                    f"""SELECT COALESCE(de.{col}, 'unknown') AS k,
                               ROUND(AVG(dh.edit_distance_pct), 3) AS avg_ed,
                               COUNT(*) AS n
                        FROM draft_events de
                        JOIN draft_history dh
                          ON de.inbound_text = dh.inbound_text
                         AND de.generated_draft = dh.generated_draft
                        WHERE dh.edit_distance_pct IS NOT NULL
                        GROUP BY k""",  # noqa: S608
                ).fetchall()
                summary["outcome"][key] = {str(r["k"]): {"avg_edit_distance": r["avg_ed"], "n": int(r["n"])} for r in rows}
            except sqlite3.OperationalError:
                pass

        try:
            matched = conn.execute(
                """SELECT COUNT(*) FROM draft_events de
                   JOIN draft_history dh
                     ON de.inbound_text = dh.inbound_text AND de.generated_draft = dh.generated_draft
                   WHERE dh.edit_distance_pct IS NOT NULL"""
            ).fetchone()[0]
            summary["outcome"]["matched"] = int(matched)
        except sqlite3.OperationalError:
            pass

        return summary
    finally:
        conn.close()


def get_latest_ingest_status(database_url: str) -> dict:
    """Latest ingestion run's status, for the wizard's live progress poll.

    Maps the run-log's ``started`` to ``running``; returns ``idle`` when there's
    no run (or no table yet).
    """
    from app.db.bootstrap import resolve_sqlite_path

    db_path = resolve_sqlite_path(database_url)
    if not db_path.exists():
        return {"status": "idle"}
    conn = sqlite3.connect(db_path)
    conn.row_factory = sqlite3.Row
    try:
        row = conn.execute(
            """SELECT status, started_at, completed_at, discovered_count, fetched_count,
                      stored_reply_pair_count, error_summary
               FROM ingest_runs ORDER BY id DESC LIMIT 1"""
        ).fetchone()
    except sqlite3.OperationalError:
        return {"status": "idle"}
    finally:
        conn.close()
    if not row:
        return {"status": "idle"}
    return {
        "status": "running" if row["status"] == "started" else row["status"],
        "started_at": row["started_at"],
        "completed_at": row["completed_at"],
        "discovered": row["discovered_count"],
        "fetched": row["fetched_count"],
        "reply_pairs": row["stored_reply_pair_count"],
        "error": row["error_summary"],
    }


def get_corpus_stats(database_url: str) -> dict:
    """Get corpus health statistics."""
    from app.db.bootstrap import resolve_sqlite_path

    db_path = resolve_sqlite_path(database_url)
    if not db_path.exists():
        return {
            "total_documents": 0,
            "total_reply_pairs": 0,
            "total_feedback_pairs": 0,
            "reviewed_today": 0,
            "reviewed_this_week": 0,
            "avg_edit_distance": None,
            "embedding_pct": None,
            "outcome_metrics": {
                "accept_unchanged_pct": None,
                "low_edit_pct": None,
                "high_rating_pct": None,
                "median_edit_distance": None,
            },
        }

    conn = sqlite3.connect(db_path)
    conn.row_factory = sqlite3.Row
    try:
        total_docs = _safe_count(conn, "documents")
        total_pairs = _safe_count(conn, "reply_pairs")
        total_feedback = _safe_count(conn, "feedback_pairs")

        reviewed_today = 0
        reviewed_week = 0
        avg_edit_dist = None
        try:
            reviewed_today = conn.execute("SELECT COUNT(*) FROM feedback_pairs WHERE DATE(created_at) = DATE('now')").fetchone()[0]
            reviewed_week = conn.execute("SELECT COUNT(*) FROM feedback_pairs WHERE created_at >= DATE('now', '-7 days')").fetchone()[0]
            row = conn.execute(
                "SELECT AVG(edit_distance_pct) FROM "
                "(SELECT edit_distance_pct FROM feedback_pairs "
                "WHERE edit_distance_pct IS NOT NULL ORDER BY id DESC LIMIT 50)"
            ).fetchone()
            if row and row[0] is not None:
                avg_edit_dist = round(row[0], 4)
        except sqlite3.OperationalError:
            pass

        embedding_pct = None
        try:
            if total_pairs > 0:
                with_emb = conn.execute("SELECT COUNT(*) FROM reply_pairs WHERE embedding IS NOT NULL").fetchone()[0]
                embedding_pct = round((with_emb / total_pairs) * 100, 1)
        except sqlite3.OperationalError:
            pass

        # Outcome metrics (proxy for real-world draft quality)
        outcome_metrics = {
            "accept_unchanged_pct": None,
            "low_edit_pct": None,
            "high_rating_pct": None,
            "median_edit_distance": None,
        }
        try:
            row = conn.execute(
                """
                SELECT
                    ROUND(100.0 * AVG(CASE WHEN edit_distance_pct <= 0.01 THEN 1.0 ELSE 0.0 END), 1) AS accept_unchanged_pct,
                    ROUND(100.0 * AVG(CASE WHEN edit_distance_pct <= 0.15 THEN 1.0 ELSE 0.0 END), 1) AS low_edit_pct,
                    ROUND(100.0 * AVG(CASE WHEN rating >= 4 THEN 1.0 ELSE 0.0 END), 1) AS high_rating_pct
                FROM feedback_pairs
                WHERE edit_distance_pct IS NOT NULL
                """
            ).fetchone()
            if row:
                outcome_metrics["accept_unchanged_pct"] = row[0]
                outcome_metrics["low_edit_pct"] = row[1]
                outcome_metrics["high_rating_pct"] = row[2]

            # Median edit distance from last 100 feedback rows
            med_row = conn.execute(
                """
                SELECT edit_distance_pct
                FROM feedback_pairs
                WHERE edit_distance_pct IS NOT NULL
                ORDER BY id DESC
                LIMIT 100
                """
            ).fetchall()
            if med_row:
                vals = sorted(float(r[0]) for r in med_row)
                n = len(vals)
                if n % 2 == 1:
                    median_val = vals[n // 2]
                else:
                    median_val = (vals[(n // 2) - 1] + vals[n // 2]) / 2
                outcome_metrics["median_edit_distance"] = round(median_val, 4)
        except sqlite3.OperationalError:
            pass

        return {
            "total_documents": total_docs,
            "total_reply_pairs": total_pairs,
            "total_feedback_pairs": total_feedback,
            "reviewed_today": reviewed_today,
            "reviewed_this_week": reviewed_week,
            "avg_edit_distance": avg_edit_dist,
            "embedding_pct": embedding_pct,
            "outcome_metrics": outcome_metrics,
        }
    finally:
        conn.close()


def get_model_status(configs_dir: Path) -> dict:
    """Get model and adapter status."""
    adapter_exists = (ADAPTER_PATH / "adapters.safetensors").exists()
    lora_trained_at = None
    if adapter_exists:
        try:
            from datetime import datetime, timezone

            mtime = os.path.getmtime(ADAPTER_PATH / "adapters.safetensors")
            lora_trained_at = datetime.fromtimestamp(mtime, tz=timezone.utc).isoformat()
        except Exception:
            pass

    # Capability-aware (not just "does the adapter file exist"): without mlx_lm
    # the local model can't run at all, so drafting falls to the cloud — claiming
    # "lora" there is the false-confidence we're trying to avoid.
    import shutil

    local_available = shutil.which("mlx_lm") is not None
    if not local_available:
        gen_model = "claude"
    elif adapter_exists:
        gen_model = "qwen2.5-1.5b-lora"
    else:
        gen_model = "qwen2.5-1.5b-base"

    # Benchmark trend
    benchmark_trend: list[dict] = []
    if AUTORESEARCH_JSONL.exists():
        try:
            lines = AUTORESEARCH_JSONL.read_text(encoding="utf-8").strip().splitlines()
            for line in lines[-5:]:
                entry = json.loads(line)
                benchmark_trend.append(
                    {
                        "date": entry.get("run_at", ""),
                        "composite_score": entry.get("composite_score"),
                        "improvements_kept": entry.get("config_snapshot", {}).get("improvements_kept"),
                    }
                )
        except Exception:
            benchmark_trend = []
    if not benchmark_trend and AUTORESEARCH_LOG.exists():
        try:
            import re

            log_text = AUTORESEARCH_LOG.read_text(encoding="utf-8")
            entries = re.findall(r"## Run (\d{4}-\d{2}-\d{2}[^\n]*)\n(.*?)(?=\n## Run |\Z)", log_text, re.DOTALL)
            for date_str, body in entries[-5:]:
                score_match = re.search(r"composite[_\s]?score[:\s]*([\d.]+)", body, re.IGNORECASE)
                kept_match = re.search(r"improvements?\s*kept[:\s]*(\d+)", body, re.IGNORECASE)
                benchmark_trend.append(
                    {
                        "date": date_str.strip(),
                        "composite_score": score_match.group(1) if score_match else None,
                        "improvements_kept": int(kept_match.group(1)) if kept_match else None,
                    }
                )
        except Exception:
            pass

    return {
        "generation_model": gen_model,
        "local_available": local_available,
        "lora_adapter_exists": adapter_exists,
        "lora_trained_at": lora_trained_at,
        "last_finetune_run": lora_trained_at,
        "benchmark_trend": benchmark_trend,
    }


def _recent_model_counts(conn: sqlite3.Connection, limit: int = 50) -> dict[str, int]:
    """``{model_used: count}`` over the most recent *limit* drafts.

    Recent-only because an adapter trained today shouldn't be judged by drafts
    produced weeks ago on the base model.
    """
    try:
        rows = conn.execute(
            "SELECT COALESCE(model_used, 'unknown') AS k, COUNT(*) AS n FROM "
            "(SELECT model_used FROM draft_events ORDER BY id DESC LIMIT ?) GROUP BY k",
            (limit,),
        ).fetchall()
        return {str(r["k"]): int(r["n"]) for r in rows}
    except sqlite3.OperationalError:
        return {}


def _classify_model_used(model_used: str) -> str:
    """Bucket a raw ``model_used`` label into lora | base | cloud | other."""
    m = (model_used or "").lower()
    if "lora" in m:  # qwen2.5-1.5b-lora and per-persona qwen2.5-1.5b-lora-<type>
        return "lora"
    if m.endswith("-base"):
        return "base"
    if m == "claude" or m.startswith("ollama"):
        return "cloud"
    return "other"


def _classify_drafting(by_model: dict[str, int], adapter_trained: bool, local_available: bool) -> tuple[str, str, str, bool]:
    """Decide what's *actually* drafting → (state, label, detail, healthy).

    Prefers reality (what recent drafts used); falls back to capability (adapter
    on disk + mlx_lm) when there are no drafts yet. ``healthy`` is False whenever
    the LoRA isn't the thing drafting — that's the signal a surface should warn on.
    """
    totals = {"lora": 0, "base": 0, "cloud": 0, "other": 0}
    for model, n in by_model.items():
        totals[_classify_model_used(model)] += n
    n = sum(totals.values())

    if n:
        lora, base, cloud = totals["lora"], totals["base"], totals["cloud"]
        if lora and not base and not cloud:
            return ("personalized", "Your fine-tuned model (LoRA)",
                    f"All of the last {n} drafts used your trained LoRA adapter.", True)
        if lora and (base or cloud):
            return ("mixed", "Mostly your LoRA — some fell back",
                    f"{base} base-model and {cloud} cloud-fallback draft(s) in the last {n}. "
                    "Check that mlx_lm is installed and the adapter is trained.", False)
        if base and not lora:
            return ("base", "Base model — your LoRA is NOT in use",
                    "Recent drafts ran on the base model. Train an adapter with `youos finetune` "
                    "(or via the wizard) so drafts sound like you.", False)
        if cloud and not lora:
            return ("cloud", "Cloud fallback — not your local LoRA",
                    "Recent drafts used the cloud/Ollama fallback, not your local model. "
                    "Is mlx_lm installed and an adapter trained?", False)
        return ("unknown", "Unknown", f"The last {n} drafts have no recognizable model label.", False)

    # No drafts yet — infer from capability.
    if not local_available:
        return ("cloud", "Cloud fallback (local model unavailable)",
                'mlx_lm is not installed, so the local model can\'t run — drafts will use the cloud '
                'fallback. Install it: pip install -e ".[mlx]".', False)
    if adapter_trained:
        return ("personalized", "Your fine-tuned model (LoRA) — ready",
                "Adapter trained and the local model is available; drafts will use your LoRA. No drafts yet.", True)
    return ("base", "Base model — no LoRA trained yet",
            "No adapter trained yet — drafts will use the base model (not personalized) until you fine-tune.", False)


_READINESS_MESSAGES = {
    "not_started": "Your voice model hasn't been trained yet — drafts use the base model and won't sound like you. "
                   "Start fine-tuning from the setup wizard or Settings.",
    "training": "Training your voice model on your sent mail… drafts use the base model until it's ready.",
    "benchmarking": "Benchmarking your newly trained voice model… almost there.",
    "benchmark_pending": "Your voice model is trained but not yet benchmarked — it'll be validated by tonight's run "
                         "(or run `youos eval --golden`). Drafts may not reflect the validated model yet.",
    "ready": "Your voice model is trained and benchmarked — drafts now sound like you.",
}


def get_model_readiness(database_url: str, *, finetune_running: bool = False) -> dict:
    """Is the personalized model ready to rely on — i.e. trained AND benchmarked?

    Used to ask a user to wait before drafting on a half-baked model. Phases:
    ``not_started`` → ``training`` → ``benchmarking`` → ``benchmark_pending`` →
    ``ready``. "Benchmarked" means a golden eval ran at or after the adapter was
    trained (the wizard's fine-tune now chains the eval, so this is reachable
    without waiting for the nightly). ``finetune_running`` is supplied by the
    route layer since the in-progress handle lives there.
    """
    adapter_file = _resolve_adapter_path() / "adapters.safetensors"
    adapter_trained = adapter_file.exists()

    benchmarked = False
    if adapter_trained:
        golden = _get_var_path("golden_results.json")
        try:
            benchmarked = golden.exists() and golden.stat().st_mtime >= adapter_file.stat().st_mtime
        except OSError:
            benchmarked = False

    if finetune_running:
        phase = "benchmarking" if adapter_trained else "training"
    elif not adapter_trained:
        phase = "not_started"
    elif not benchmarked:
        phase = "benchmark_pending"
    else:
        phase = "ready"

    return {
        "phase": phase,
        "ready": phase == "ready",
        "message": _READINESS_MESSAGES[phase],
        "adapter_trained": adapter_trained,
        "benchmarked": benchmarked,
        "running": finetune_running,
    }


def get_drafting_model_status(database_url: str) -> dict:
    """What model is *actually* drafting — to prevent the silent-failure where a
    user believes drafts are personalized while they run on the base model or
    fall back to the cloud.

    Reality first (recent ``draft_events.model_used``), capability as the fallback
    (adapter on disk + ``mlx_lm`` available).
    """
    import shutil

    from app.db.bootstrap import resolve_sqlite_path

    adapter_trained = (_resolve_adapter_path() / "adapters.safetensors").exists()
    local_available = shutil.which("mlx_lm") is not None

    by_model: dict[str, int] = {}
    db_path = resolve_sqlite_path(database_url)
    if db_path.exists():
        conn = sqlite3.connect(db_path)
        conn.row_factory = sqlite3.Row
        try:
            by_model = _recent_model_counts(conn)
        finally:
            conn.close()

    state, label, detail, healthy = _classify_drafting(by_model, adapter_trained, local_available)
    return {
        "state": state,
        "label": label,
        "detail": detail,
        "healthy": healthy,
        "adapter_trained": adapter_trained,
        "local_available": local_available,
        "recent_by_model": by_model,
    }


def get_pipeline_status(project_root: Path) -> dict | None:
    """Read var/pipeline_last_run.json if it exists."""
    log_path = project_root / "var" / "pipeline_last_run.json"
    if not log_path.exists():
        return None
    try:
        return json.loads(log_path.read_text(encoding="utf-8"))
    except (json.JSONDecodeError, OSError):
        return None


def get_persona_adapter_status() -> dict[str, dict]:
    """Return ``{persona: {trained: bool, mtime: iso|None, pairs_used: int|None}}``.

    One entry per sender_type cohort (internal / external_client / personal /
    automated) — "unknown" excluded since Phase 2 doesn't train an adapter
    for it. Used by the stats endpoint and the doctor check so the user
    can see which personas have a trained adapter without poking the
    filesystem.

    ``mtime`` and ``pairs_used`` come from the adapter's `meta.json`
    (written by `scripts/finetune_lora.py`) when present; otherwise mtime
    falls back to the safetensors file's mtime and pairs_used is None.
    """
    from app.core.settings import get_persona_adapter_path

    out: dict[str, dict] = {}
    for persona in ("internal", "external_client", "personal", "automated"):
        adapter_dir = get_persona_adapter_path(persona)
        sfile = adapter_dir / "adapters.safetensors"
        meta_path = adapter_dir / "meta.json"
        entry: dict = {"trained": sfile.exists(), "mtime": None, "pairs_used": None}
        if not sfile.exists():
            out[persona] = entry
            continue
        # Prefer the train metadata json; fall back to fs mtime.
        if meta_path.exists():
            try:
                meta = json.loads(meta_path.read_text(encoding="utf-8"))
                entry["mtime"] = meta.get("trained_at")
                entry["pairs_used"] = meta.get("pairs_used")
            except Exception:
                pass
        if entry["mtime"] is None:
            try:
                from datetime import datetime, timezone

                entry["mtime"] = datetime.fromtimestamp(
                    sfile.stat().st_mtime, tz=timezone.utc
                ).isoformat()
            except Exception:
                pass
        out[persona] = entry
    return out


def get_embedding_coverage(database_url: str) -> dict[str, float]:
    """Fraction of rows with non-empty embeddings, per indexed table.

    Returns ``{"chunks": 0.42, "reply_pairs": 0.31}`` when both tables exist
    and have rows. Missing / empty tables are silently omitted — so a fresh
    instance returns ``{}`` rather than zeros that would imply "indexed,
    but every row failed".

    Used to answer "is semantic retrieval actually firing on this corpus?"
    from the stats endpoint and the nightly pipeline log — without this,
    the only way to tell was to add ad-hoc logging inside the retrieval
    reranker. Mirrors ``app.retrieval.service._embedding_coverage`` which
    is per-table and connection-scoped; this is the public, multi-table,
    db-path-scoped version intended for stats callers.
    """
    from app.db.bootstrap import resolve_sqlite_path

    db_path = resolve_sqlite_path(database_url)
    if not db_path.exists():
        return {}

    coverage: dict[str, float] = {}
    conn = sqlite3.connect(db_path)
    try:
        existing_tables = {
            row[0]
            for row in conn.execute(
                "SELECT name FROM sqlite_master WHERE type='table'"
            ).fetchall()
        }
        for table in ("chunks", "reply_pairs"):
            if table not in existing_tables:
                continue
            cols = {row[1] for row in conn.execute(f"PRAGMA table_info({table})").fetchall()}
            if "embedding" not in cols:
                continue
            total = _safe_count(conn, table)
            if total == 0:
                continue
            row = conn.execute(
                f"SELECT COUNT(*) FROM {table} WHERE embedding IS NOT NULL AND LENGTH(embedding) > 0"  # noqa: S608
            ).fetchone()
            embedded = row[0] if row else 0
            coverage[table] = round(embedded / total, 4)
    except sqlite3.OperationalError:
        return coverage
    finally:
        conn.close()
    return coverage