文件内容
scripts/lib/schema.py
"""Core data model for the last30days pipeline. Version in lib/__init__.py."""
from __future__ import annotations
from dataclasses import asdict, dataclass, field, is_dataclass
from typing import Any, Literal
def _drop_none(value: Any) -> Any:
"""Recursively remove None values from dataclass-derived structures."""
if is_dataclass(value):
return _drop_none(asdict(value))
if isinstance(value, dict):
return {
key: _drop_none(item)
for key, item in value.items()
if item is not None
}
if isinstance(value, list):
return [_drop_none(item) for item in value]
return value
def _first_non_none(*values: Any) -> Any:
for value in values:
if value is not None:
return value
return None
@dataclass(frozen=True)
class ProviderRuntime:
"""Resolved runtime provider selection."""
reasoning_provider: Literal["aisa", "local"]
planner_model: str
rerank_model: str
x_search_backend: Literal["aisa"] | None = None
# Dedicated model for the fun/vibes scoring pass. Falls back to
# rerank_model at resolve time if unset, so existing configs keep
# working without a fun_model pin.
fun_model: str = ""
@dataclass(frozen=True)
class SubQuery:
"""Planner-emitted retrieval unit."""
label: str
search_query: str
ranking_query: str
sources: list[str]
weight: float = 1.0
def __post_init__(self) -> None:
if not self.sources:
raise ValueError("SubQuery must have at least one source")
if self.weight <= 0:
raise ValueError(f"SubQuery weight must be positive, got {self.weight}")
@dataclass
class QueryPlan:
"""Planner output."""
intent: str
freshness_mode: str
cluster_mode: str
raw_topic: str
subqueries: list[SubQuery]
source_weights: dict[str, float]
notes: list[str] = field(default_factory=list)
@dataclass
class SourceItem:
"""Generic normalized evidence item."""
item_id: str
source: str
title: str
body: str
url: str
author: str | None = None
container: str | None = None
published_at: str | None = None
date_confidence: Literal["high", "med", "low"] = "low"
engagement: dict[str, float | int] = field(default_factory=dict)
relevance_hint: float = 0.5
why_relevant: str = ""
snippet: str = ""
metadata: dict[str, Any] = field(default_factory=dict)
# Signal fields populated by signals.annotate_stream (after construction)
local_relevance: float | None = None
freshness: int | None = None
engagement_score: float | None = None
source_quality: float | None = None
local_rank_score: float | None = None
@dataclass
class Candidate:
"""Global candidate after fusion and reranking."""
candidate_id: str
item_id: str
source: str
title: str
url: str
snippet: str
subquery_labels: list[str]
native_ranks: dict[str, int]
local_relevance: float
freshness: int
engagement: int | float | None
source_quality: float
rrf_score: float
sources: list[str] = field(default_factory=list)
source_items: list[SourceItem] = field(default_factory=list)
rerank_score: float | None = None
final_score: float = 0.0
explanation: str | None = None
fun_score: float | None = None
fun_explanation: str | None = None
cluster_id: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
@dataclass
class Cluster:
"""Ranked cluster of related candidates."""
cluster_id: str
title: str
candidate_ids: list[str]
representative_ids: list[str]
sources: list[str]
score: float
uncertainty: Literal["single-source", "thin-evidence"] | None = None
def __post_init__(self) -> None:
if not set(self.representative_ids) <= set(self.candidate_ids):
raise ValueError("representative_ids must be a subset of candidate_ids")
@dataclass
class Report:
"""Final pipeline output."""
topic: str
range_from: str
range_to: str
generated_at: str
provider_runtime: ProviderRuntime
query_plan: QueryPlan
clusters: list[Cluster]
ranked_candidates: list[Candidate]
items_by_source: dict[str, list[SourceItem]]
errors_by_source: dict[str, str]
warnings: list[str] = field(default_factory=list)
artifacts: dict[str, Any] = field(default_factory=dict)
@dataclass
class RetrievalBundle:
"""Structured retrieval output before global ranking."""
items_by_source_and_query: dict[tuple[str, str], list[SourceItem]] = field(default_factory=dict)
items_by_source: dict[str, list[SourceItem]] = field(default_factory=dict)
errors_by_source: dict[str, str] = field(default_factory=dict)
artifacts: dict[str, Any] = field(default_factory=dict)
def add_items(self, label: str, source: str, items: list[SourceItem]) -> None:
"""Atomically append items to both items_by_source_and_query and items_by_source."""
self.items_by_source_and_query.setdefault((label, source), []).extend(items)
self.items_by_source.setdefault(source, []).extend(items)
def to_dict(value: Any) -> Any:
"""Serialize dataclasses and nested containers."""
return _drop_none(value)
def provider_runtime_from_dict(payload: dict[str, Any]) -> ProviderRuntime:
return ProviderRuntime(
reasoning_provider=payload["reasoning_provider"],
planner_model=payload["planner_model"],
rerank_model=payload["rerank_model"],
x_search_backend=payload.get("x_search_backend"),
fun_model=payload.get("fun_model", ""),
)
def subquery_from_dict(payload: dict[str, Any]) -> SubQuery:
return SubQuery(
label=payload["label"],
search_query=payload["search_query"],
ranking_query=payload["ranking_query"],
sources=list(payload.get("sources") or []),
weight=float(payload.get("weight") or 1.0),
)
def query_plan_from_dict(payload: dict[str, Any]) -> QueryPlan:
return QueryPlan(
intent=payload["intent"],
freshness_mode=payload["freshness_mode"],
cluster_mode=payload["cluster_mode"],
raw_topic=payload["raw_topic"],
subqueries=[subquery_from_dict(item) for item in payload.get("subqueries") or []],
source_weights=dict(payload.get("source_weights") or {}),
notes=list(payload.get("notes") or []),
)
def source_item_from_dict(payload: dict[str, Any]) -> SourceItem:
meta = payload.get("metadata") or {}
return SourceItem(
item_id=payload["item_id"],
source=payload["source"],
title=payload["title"],
body=payload.get("body") or "",
url=payload.get("url") or "",
author=payload.get("author"),
container=payload.get("container"),
published_at=payload.get("published_at"),
date_confidence=payload.get("date_confidence") or "low",
engagement=dict(payload.get("engagement") or {}),
relevance_hint=float(_first_non_none(payload.get("relevance_hint"), 0.5)),
why_relevant=payload.get("why_relevant") or "",
snippet=payload.get("snippet") or "",
metadata=dict(meta),
local_relevance=_first_non_none(payload.get("local_relevance"), meta.get("local_relevance")),
freshness=_first_non_none(payload.get("freshness"), meta.get("freshness")),
engagement_score=_first_non_none(payload.get("engagement_score"), meta.get("engagement_score")),
source_quality=_first_non_none(payload.get("source_quality"), meta.get("source_quality")),
local_rank_score=_first_non_none(payload.get("local_rank_score"), meta.get("local_rank_score")),
)
def candidate_from_dict(payload: dict[str, Any]) -> Candidate:
return Candidate(
candidate_id=payload["candidate_id"],
item_id=payload["item_id"],
source=payload["source"],
title=payload["title"],
url=payload.get("url") or "",
snippet=payload.get("snippet") or "",
subquery_labels=list(payload.get("subquery_labels") or []),
native_ranks={key: int(value) for key, value in (payload.get("native_ranks") or {}).items()},
local_relevance=float(_first_non_none(payload.get("local_relevance"), 0.0)),
freshness=int(_first_non_none(payload.get("freshness"), 0)),
engagement=payload.get("engagement"),
source_quality=float(_first_non_none(payload.get("source_quality"), 0.0)),
rrf_score=float(_first_non_none(payload.get("rrf_score"), 0.0)),
sources=list(payload.get("sources") or []),
source_items=[source_item_from_dict(item) for item in payload.get("source_items") or []],
rerank_score=float(payload["rerank_score"]) if payload.get("rerank_score") is not None else None,
final_score=float(_first_non_none(payload.get("final_score"), 0.0)),
explanation=payload.get("explanation"),
fun_score=float(payload["fun_score"]) if payload.get("fun_score") is not None else None,
fun_explanation=payload.get("fun_explanation"),
cluster_id=payload.get("cluster_id"),
metadata=dict(payload.get("metadata") or {}),
)
def cluster_from_dict(payload: dict[str, Any]) -> Cluster:
return Cluster(
cluster_id=payload["cluster_id"],
title=payload["title"],
candidate_ids=list(payload.get("candidate_ids") or []),
representative_ids=list(payload.get("representative_ids") or []),
sources=list(payload.get("sources") or []),
score=float(_first_non_none(payload.get("score"), 0.0)),
uncertainty=payload.get("uncertainty"),
)
def report_from_dict(payload: dict[str, Any]) -> Report:
return Report(
topic=payload["topic"],
range_from=payload["range_from"],
range_to=payload["range_to"],
generated_at=payload["generated_at"],
provider_runtime=provider_runtime_from_dict(payload["provider_runtime"]),
query_plan=query_plan_from_dict(payload["query_plan"]),
clusters=[cluster_from_dict(item) for item in payload.get("clusters") or []],
ranked_candidates=[candidate_from_dict(item) for item in payload.get("ranked_candidates") or []],
items_by_source={
source: [source_item_from_dict(item) for item in items]
for source, items in (payload.get("items_by_source") or {}).items()
},
errors_by_source=dict(payload.get("errors_by_source") or {}),
warnings=list(payload.get("warnings") or []),
artifacts=dict(payload.get("artifacts") or {}),
)
def candidate_sources(candidate: Candidate) -> list[str]:
if candidate.sources:
return candidate.sources
return [candidate.source] if candidate.source else []
def candidate_source_label(candidate: Candidate) -> str:
sources = candidate_sources(candidate)
return ", ".join(sources) if sources else "unknown"
def candidate_best_published_at(candidate: Candidate) -> str | None:
return max(
(item.published_at for item in candidate.source_items if item.published_at),
default=None,
)
def candidate_primary_item(candidate: Candidate) -> SourceItem | None:
if not candidate.source_items:
return None
for item in candidate.source_items:
if item.source == candidate.source:
return item
return candidate.source_items[0]