文件预览

hackernews.py

查看 Last30days 技能包中的文件内容。

文件内容

scripts/lib/hackernews.py

"""Hacker News search via Algolia API (free, no auth required).

Uses hn.algolia.com/api/v1 for story discovery and comment enrichment.
No API key needed - just HTTP calls via stdlib urllib.
"""

import datetime
import html
import math
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Any, Dict, List, Optional

import re

from . import http, log
from .query import NOISE_WORDS, extract_core_subject
from .relevance import token_overlap_relevance

# Common HN prefixes that can cause false-positive keyword matches
_HN_PREFIXES = re.compile(r"^(Tell HN|Show HN|Ask HN|Launch HN)\s*:\s*", re.IGNORECASE)

ALGOLIA_SEARCH_URL = "https://hn.algolia.com/api/v1/search"
ALGOLIA_SEARCH_BY_DATE_URL = "https://hn.algolia.com/api/v1/search_by_date"
ALGOLIA_ITEM_URL = "https://hn.algolia.com/api/v1/items"

DEPTH_CONFIG = {
    "quick": 15,
    "default": 30,
    "deep": 60,
}

ENRICH_LIMITS = {
    "quick": 3,
    "default": 5,
    "deep": 10,
}


def _log(msg: str):
    log.source_log("HN", msg)


def _date_to_unix(date_str: str) -> int:
    """Convert YYYY-MM-DD to Unix timestamp (start of day UTC)."""
    parts = date_str.split("-")
    year, month, day = int(parts[0]), int(parts[1]), int(parts[2])
    dt = datetime.datetime(year, month, day, tzinfo=datetime.timezone.utc)
    return int(dt.timestamp())


def _unix_to_date(ts: int) -> str:
    """Convert Unix timestamp to YYYY-MM-DD."""
    dt = datetime.datetime.fromtimestamp(ts, tz=datetime.timezone.utc)
    return dt.strftime("%Y-%m-%d")


def _strip_html(text: str) -> str:
    """Strip HTML tags and decode entities from HN comment text."""
    import re
    text = html.unescape(text)
    text = re.sub(r'<p>', '\n', text)
    text = re.sub(r'<[^>]+>', '', text)
    return text.strip()


def search_hackernews(
    topic: str,
    from_date: str,
    to_date: str,
    depth: str = "default",
) -> Dict[str, Any]:
    """Search Hacker News via Algolia API.

    Args:
        topic: Search topic
        from_date: Start date (YYYY-MM-DD)
        to_date: End date (YYYY-MM-DD)
        depth: 'quick', 'default', or 'deep'

    Returns:
        Dict with Algolia response (contains 'hits' list).
    """
    count = DEPTH_CONFIG.get(depth, DEPTH_CONFIG["default"])
    from_ts = _date_to_unix(from_date)
    to_ts = _date_to_unix(to_date) + 86400  # Include the end date

    # Use extracted core subject instead of raw topic for cleaner Algolia matching
    core = extract_core_subject(topic)
    _log(f"Searching for '{core}' (raw: '{topic}', since {from_date}, count={count})")

    # Use relevance-sorted search with minimum engagement filter.
    # NOTE: restrictSearchableAttributes=title omitted intentionally — it would
    # miss Ask HN/Show HN threads where the topic appears in the body.
    params = {
        "query": core,
        "tags": "story",
        "numericFilters": f"created_at_i>{from_ts},created_at_i<{to_ts},points>2",
        "hitsPerPage": str(count),
    }

    from urllib.parse import urlencode
    url = f"{ALGOLIA_SEARCH_URL}?{urlencode(params)}"

    try:
        response = http.request("GET", url, timeout=30)
    except http.HTTPError as e:
        _log(f"Search failed: {e}")
        return {"hits": [], "error": str(e)}
    except Exception as e:
        _log(f"Search failed: {e}")
        return {"hits": [], "error": str(e)}

    hits = response.get("hits", [])
    _log(f"Found {len(hits)} stories")
    return response


def _title_matches_query(title: str, query: str, author: str = "") -> bool:
    """Check whether a meaningful fraction of the query's content words appear
    in the title (after stripping HN prefixes like 'Show HN:').

    Requires at least ceil(N/2) of the noise-stripped query tokens to appear
    in the title — this tolerates short, implicit headlines like
    "The next evolution of the Agents SDK" matching the query
    "openai agents sdk" (2 of 3 content words present) while still rejecting
    unrelated stories that only share one stop-word.
    """
    if not query:
        return True
    stripped = _HN_PREFIXES.sub("", title).strip()
    check_text = stripped.lower()
    # Noise-strip the query with the same set used when composing the
    # Algolia query, so the post-filter doesn't demand words we already
    # intentionally dropped upstream.
    raw_words = query.lower().split()
    content_words = [w for w in raw_words if w not in NOISE_WORDS]
    if not content_words:
        # Query was pure noise — fall back to raw tokens to avoid matching everything.
        content_words = raw_words
    if not content_words:
        return True
    hits = sum(1 for w in content_words if w in check_text)
    required = max(1, (len(content_words) + 1) // 2)
    return hits >= required


def parse_hackernews_response(response: Dict[str, Any], query: str = "") -> List[Dict[str, Any]]:
    """Parse Algolia response into normalized item dicts.

    Args:
        response: Algolia search response
        query: Original search query for token-overlap relevance scoring

    Returns:
        List of item dicts ready for normalization.
    """
    hits = response.get("hits", [])
    # Post-filter: remove items where query only matched an HN prefix like "Tell HN:"
    if query:
        before = len(hits)
        hits = [
            h for h in hits
            if _title_matches_query(h.get("title", ""), query, h.get("author", ""))
        ]
        dropped = before - len(hits)
        if dropped:
            _log(f"Prefix filter removed {dropped}/{before} false-positive hits for '{query}'")
    items = []

    for i, hit in enumerate(hits):
        object_id = hit.get("objectID", "")
        points = hit.get("points") or 0
        num_comments = hit.get("num_comments") or 0
        created_at_i = hit.get("created_at_i")

        date_str = None
        if created_at_i:
            date_str = _unix_to_date(created_at_i)

        # Article URL vs HN discussion URL
        article_url = hit.get("url") or ""
        hn_url = f"https://news.ycombinator.com/item?id={object_id}"

        # Relevance: blend Algolia rank with token-overlap content matching
        rank_score = max(0.3, 1.0 - (i * 0.02))  # 1.0 -> 0.3 over 35 items
        engagement_boost = min(0.2, math.log1p(points) / 40)
        if query:
            content_score = token_overlap_relevance(query, hit.get("title", ""))
            relevance = min(1.0, 0.6 * rank_score + 0.4 * content_score + engagement_boost)
        else:
            relevance = min(1.0, rank_score * 0.7 + engagement_boost + 0.1)

        items.append({
            "id": object_id,
            "title": hit.get("title", ""),
            "url": article_url,
            "hn_url": hn_url,
            "author": hit.get("author", ""),
            "date": date_str,
            "engagement": {
                "points": points,
                "comments": num_comments,
            },
            "relevance": round(relevance, 2),
            "why_relevant": f"HN story about {hit.get('title', 'topic')[:60]}",
        })

    return items


def _fetch_item_comments(object_id: str, max_comments: int = 5) -> Dict[str, Any]:
    """Fetch top-level comments for a story from Algolia items endpoint.

    Args:
        object_id: HN story ID
        max_comments: Max comments to return

    Returns:
        Dict with 'comments' list and 'comment_insights' list.
    """
    url = f"{ALGOLIA_ITEM_URL}/{object_id}"

    try:
        data = http.request("GET", url, timeout=15)
    except Exception as e:
        _log(f"Failed to fetch comments for {object_id}: {e}")
        return {"comments": [], "comment_insights": []}

    children = data.get("children", [])

    # Sort by points (highest first), filter to actual comments
    real_comments = [
        c for c in children
        if c.get("text") and c.get("author")
    ]
    real_comments.sort(key=lambda c: c.get("points") or 0, reverse=True)

    comments = []
    insights = []
    for c in real_comments[:max_comments]:
        text = _strip_html(c.get("text", ""))
        excerpt = text[:300] + "..." if len(text) > 300 else text
        comments.append({
            "author": c.get("author", ""),
            "text": excerpt,
            "points": c.get("points") or 0,
        })
        # First sentence as insight
        first_sentence = text.split(". ")[0].split("\n")[0][:200]
        if first_sentence:
            insights.append(first_sentence)

    return {"comments": comments, "comment_insights": insights}


def enrich_top_stories(
    items: List[Dict[str, Any]],
    depth: str = "default",
) -> List[Dict[str, Any]]:
    """Fetch comments for top N stories by points.

    Args:
        items: Parsed HN items
        depth: Research depth (controls how many to enrich)

    Returns:
        Items with top_comments and comment_insights added.
    """
    if not items:
        return items

    limit = ENRICH_LIMITS.get(depth, ENRICH_LIMITS["default"])

    # Sort by points to enrich the most popular stories
    by_points = sorted(
        range(len(items)),
        key=lambda i: items[i].get("engagement", {}).get("points", 0),
        reverse=True,
    )
    to_enrich = by_points[:limit]

    _log(f"Enriching top {len(to_enrich)} stories with comments")

    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = {
            executor.submit(
                _fetch_item_comments,
                items[idx]["id"],
            ): idx
            for idx in to_enrich
        }

        for future in as_completed(futures):
            idx = futures[future]
            try:
                result = future.result(timeout=15)
                items[idx]["top_comments"] = result["comments"]
                items[idx]["comment_insights"] = result["comment_insights"]
            except (KeyError, TypeError, OSError) as exc:
                _log(f"Comment enrichment failed for story {items[idx].get('id', '?')}: {type(exc).__name__}: {exc}")
                items[idx]["top_comments"] = []
                items[idx]["comment_insights"] = []

    return items