文件预览

threads.py

查看 Last30days 技能包中的文件内容。

文件内容

scripts/lib/threads.py

"""Threads discovery for /last30days using the AISA web proxy."""

import math
import re
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional

from . import aisa, log
from .relevance import token_overlap_relevance as _compute_relevance

# Depth configurations: how many results to fetch
DEPTH_CONFIG = {
    "quick":   {"results": 10},
    "default": {"results": 20},
    "deep":    {"results": 40},
}


def _log(msg: str):
    log.source_log("Threads", msg)

def _search_via_aisa(topic: str, from_date: str, to_date: str, depth: str, token: str) -> Dict[str, Any]:
    config = DEPTH_CONFIG.get(depth, DEPTH_CONFIG["default"])
    result = aisa.search_tavily(token, f"site:threads.net {topic}", limit=config["results"])
    web_items, _ = aisa.parse_tavily_response(result, date_range=(from_date, to_date))
    items: List[Dict[str, Any]] = []
    for idx, entry in enumerate(web_items, start=1):
        url = entry.get("url", "")
        if "threads.net" not in url:
            continue
        date_str = entry.get("date")
        if date_str and not (from_date <= date_str <= to_date):
            continue
        text = entry.get("title") or entry.get("snippet") or topic
        items.append({
            "id": f"TH{idx}",
            "handle": "",
            "display_name": "",
            "text": text,
            "url": url,
            "date": date_str,
            "engagement": {"likes": 0, "replies": 0, "reposts": 0, "quotes": 0},
            "relevance": entry.get("relevance", 0.5),
            "why_relevant": "Threads web result via AISA",
        })
    return {"items": items[: config["results"]]}


def _extract_core_subject(topic: str) -> str:
    """Extract core subject from verbose query for Threads search."""
    from .query import extract_core_subject
    _THREADS_NOISE = frozenset({
        'best', 'top', 'good', 'great', 'awesome',
        'latest', 'new', 'news', 'update', 'updates',
        'trending', 'hottest', 'popular', 'viral',
        'practices', 'features', 'recommendations', 'advice',
    })
    return extract_core_subject(topic, noise=_THREADS_NOISE)


def _parse_date(item: Dict[str, Any]) -> Optional[str]:
    """Parse date from Threads item to YYYY-MM-DD.

    Tries common timestamp fields: taken_at (unix), created_at (ISO),
    and falls back to any date-like string field.
    """
    # Unix timestamp (taken_at is common in Meta APIs)
    for key in ("taken_at", "create_time"):
        ts = item.get(key)
        if ts:
            try:
                from . import dates
                return dates.timestamp_to_date(int(ts))
            except (ValueError, TypeError):
                pass

    # ISO 8601 string
    for key in ("created_at", "published_at", "date"):
        val = item.get(key)
        if val and isinstance(val, str):
            try:
                dt = datetime.fromisoformat(val.replace("Z", "+00:00"))
                return dt.strftime("%Y-%m-%d")
            except (ValueError, TypeError):
                pass

    return None


def _parse_items(raw_items: List[Dict[str, Any]], core_topic: str) -> List[Dict[str, Any]]:
    """Parse raw Threads items into normalized dicts."""
    items = []
    for i, raw in enumerate(raw_items):
        post_id = str(
            raw.get("id")
            or raw.get("pk")
            or raw.get("code")
            or f"TH{i + 1}"
        )
        text = raw.get("text") or raw.get("caption") or raw.get("content") or ""
        if isinstance(text, dict):
            text = text.get("text", "")

        # Author extraction
        user = raw.get("user") or raw.get("author") or {}
        if isinstance(user, dict):
            handle = user.get("username") or user.get("handle") or ""
            display_name = user.get("full_name") or user.get("displayName") or handle
        elif isinstance(user, str):
            handle = user
            display_name = user
        else:
            handle = ""
            display_name = ""

        # Engagement metrics
        likes = raw.get("like_count") or raw.get("likes") or 0
        replies = raw.get("reply_count") or raw.get("replies") or 0
        reposts = raw.get("repost_count") or raw.get("reposts") or 0
        quotes = raw.get("quote_count") or raw.get("quotes") or 0

        date_str = _parse_date(raw)

        # Build URL
        code = raw.get("code") or raw.get("shortcode") or ""
        url = raw.get("url") or raw.get("share_url") or ""
        if not url and code:
            url = f"https://www.threads.net/post/{code}"
        elif not url and handle and post_id:
            url = f"https://www.threads.net/@{handle}/post/{post_id}"

        # Relevance: position-based plus engagement boost for short social posts.
        rank_score = max(0.3, 1.0 - (i * 0.02))
        engagement_boost = min(0.2, math.log1p(likes + reposts) / 40)
        text_relevance = _compute_relevance(core_topic, text)
        relevance = min(1.0, text_relevance * 0.5 + rank_score * 0.3 + engagement_boost + 0.1)

        items.append({
            "id": post_id,
            "handle": handle,
            "display_name": display_name,
            "text": text,
            "url": url,
            "date": date_str,
            "engagement": {
                "likes": likes,
                "replies": replies,
                "reposts": reposts,
                "quotes": quotes,
            },
            "relevance": round(relevance, 2),
            "why_relevant": f"Threads: @{handle}: {text[:60]}" if text else f"Threads: {handle}",
        })
    return items


def search_threads(
    topic: str,
    from_date: str,
    to_date: str,
    depth: str = "default",
    token: str = None,
) -> Dict[str, Any]:
    """Search Threads using the hosted AISA discovery path.

    Args:
        topic: Search topic
        from_date: Start date (YYYY-MM-DD)
        to_date: End date (YYYY-MM-DD)
        depth: 'quick', 'default', or 'deep'
        token: AISA API key

    Returns:
        Dict with 'items' list and optional 'error'.
    """
    if not token:
        return {"items": [], "error": "AISA_API_KEY not configured"}
    return _search_via_aisa(topic, from_date, to_date, depth, token)


def parse_threads_response(response: Dict[str, Any]) -> List[Dict[str, Any]]:
    """Parse Threads search response to normalized format.

    Returns:
        List of item dicts ready for normalization.
    """
    return response.get("items", [])