文件预览

aisa.py

查看 Last30days 技能包中的文件内容。

文件内容

scripts/lib/aisa.py

"""AIsa API helpers for chat, search, and social retrieval."""

from __future__ import annotations

import math
import urllib.parse
from typing import Any

from . import dates, http

AISA_BASE_URL = "https://api.aisa.one"
AISA_CHAT_COMPLETIONS_URL = f"{AISA_BASE_URL}/v1/chat/completions"
AISA_TWITTER_SEARCH_URL = f"{AISA_BASE_URL}/apis/v1/twitter/tweet/advanced_search"
AISA_YOUTUBE_SEARCH_URL = f"{AISA_BASE_URL}/apis/v1/youtube/search"
AISA_TAVILY_SEARCH_URL = f"{AISA_BASE_URL}/apis/v1/tavily/search"
AISA_POLYMARKET_MARKETS_URL = f"{AISA_BASE_URL}/apis/v1/polymarket/markets"
AISA_CHAT_TIMEOUT = 60
AISA_CHAT_RETRIES = 1

DEPTH_LIMITS = {
    "quick": 6,
    "default": 12,
    "deep": 24,
}


def _headers(api_key: str) -> dict[str, str]:
    return {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json",
    }


def chat_completion(
    api_key: str,
    model: str,
    prompt: str,
    *,
    response_mime_type: str | None = None,
) -> dict[str, Any]:
    payload: dict[str, Any] = {
        "model": model,
        "messages": [{"role": "user", "content": prompt}],
        "temperature": 0,
    }
    if response_mime_type == "application/json":
        payload["response_format"] = {"type": "json_object"}
    return http.post(
        AISA_CHAT_COMPLETIONS_URL,
        payload,
        headers=_headers(api_key),
        timeout=AISA_CHAT_TIMEOUT,
        retries=AISA_CHAT_RETRIES,
        max_429_retries=1,
    )


def search_twitter(
    api_key: str,
    query: str,
    from_date: str,
    depth: str = "default",
) -> dict[str, Any]:
    params = urllib.parse.urlencode(
        {
            "query": f"{query} since:{from_date}",
            "queryType": "Top",
            "count": DEPTH_LIMITS.get(depth, DEPTH_LIMITS["default"]),
        }
    )
    return http.get(
        f"{AISA_TWITTER_SEARCH_URL}?{params}",
        headers=_headers(api_key),
        timeout=45,
    )


def search_youtube(
    api_key: str,
    query: str,
    depth: str = "default",
) -> dict[str, Any]:
    params = urllib.parse.urlencode(
        {
            "engine": "youtube",
            "q": query,
            "num": DEPTH_LIMITS.get(depth, DEPTH_LIMITS["default"]),
        }
    )
    return http.get(
        f"{AISA_YOUTUBE_SEARCH_URL}?{params}",
        headers=_headers(api_key),
        timeout=45,
    )


def search_tavily(
    api_key: str,
    query: str,
    date_range: tuple[str, str] | None = None,
    *,
    limit: int | None = None,
    count: int = 5,
) -> dict[str, Any]:
    max_results = limit if limit is not None else count
    payload: dict[str, Any] = {
        "query": query,
        "topic": "news",
        "max_results": max_results,
        "include_raw_content": False,
    }
    if date_range:
        payload["start_date"] = date_range[0]
        payload["end_date"] = date_range[1]
    return http.post(
        AISA_TAVILY_SEARCH_URL,
        payload,
        headers=_headers(api_key),
        timeout=45,
    )


def search_polymarket(
    api_key: str,
    query: str,
) -> dict[str, Any]:
    params = urllib.parse.urlencode({"search": query, "status": "open"})
    return http.get(
        f"{AISA_POLYMARKET_MARKETS_URL}?{params}",
        headers=_headers(api_key),
        timeout=30,
    )


def parse_twitter_response(response: dict[str, Any], *, query: str = "") -> list[dict[str, Any]]:
    raw_items = (
        response.get("tweets")
        or response.get("results")
        or response.get("data")
        or response.get("items")
        or []
    )
    items: list[dict[str, Any]] = []
    for index, item in enumerate(raw_items):
        if not isinstance(item, dict):
            continue
        text = _first(item, "full_text", "text", "content")
        url = _first(item, "url", "tweet_url")
        if not url:
            tweet_id = _first(item, "tweet_id", "id", "rest_id")
            handle = _handle_from_item(item)
            if tweet_id and handle:
                url = f"https://x.com/{handle}/status/{tweet_id}"
        if not url:
            continue
        items.append(
            {
                "id": f"AX{index + 1}",
                "text": str(text or "").strip()[:500],
                "url": url,
                "author_handle": _handle_from_item(item),
                "date": _normalize_date(_first(item, "created_at", "date", "published_at")),
                "engagement": {
                    "likes": _to_int(_first(item, "favorite_count", "likes", "like_count")),
                    "reposts": _to_int(_first(item, "retweet_count", "reposts")),
                    "replies": _to_int(_first(item, "reply_count", "replies")),
                    "quotes": _to_int(_first(item, "quote_count", "quotes")),
                },
                "why_relevant": "AIsa Twitter search",
                "relevance": 0.8 if not query else _query_relevance(query, str(text or "")),
            }
        )
    return items


def parse_youtube_response(response: dict[str, Any], *, topic: str, from_date: str) -> list[dict[str, Any]]:
    raw_items = (
        response.get("video_results")
        or response.get("videos")
        or response.get("items")
        or response.get("results")
        or []
    )
    items: list[dict[str, Any]] = []
    for index, item in enumerate(raw_items):
        if not isinstance(item, dict):
            continue
        video_id = _first(item, "videoId", "id")
        url = _first(item, "link", "url")
        if not url and video_id:
            url = f"https://www.youtube.com/watch?v={video_id}"
        if not url:
            continue
        title = str(_first(item, "title", "name") or "")
        snippet = str(_first(item, "snippet", "description") or "")[:500]
        items.append(
            {
                "video_id": video_id or f"aisa-{index + 1}",
                "title": title,
                "url": url,
                "channel_name": str(_first(item, "channel", "channel_name", "author") or ""),
                "date": _normalize_date(_first(item, "publishedDate", "published_at", "date")),
                "engagement": {
                    "views": _to_int(_first(item, "views", "view_count")) or 0,
                    "likes": _to_int(_first(item, "likes", "like_count")),
                    "comments": None,
                },
                "duration": _first(item, "duration", "length"),
                "relevance": _youtube_relevance(topic, title, snippet),
                "why_relevant": "AIsa YouTube search",
                "description": snippet,
            }
        )
    recent = [item for item in items if item.get("date") and item["date"] >= from_date]
    return recent if len(recent) >= 3 else items


def parse_tavily_response(response: dict[str, Any], *, date_range: tuple[str, str]) -> tuple[list[dict[str, Any]], dict[str, Any]]:
    raw_items = response.get("results") or response.get("data") or []
    items: list[dict[str, Any]] = []
    for index, item in enumerate(raw_items):
        if not isinstance(item, dict):
            continue
        url = _first(item, "url", "link")
        if not url:
            continue
        pub_date = _normalize_date(_first(item, "published_date", "date"))
        if pub_date and not (date_range[0] <= pub_date <= date_range[1]):
            continue
        items.append(
            {
                "id": f"AT{index + 1}",
                "title": str(_first(item, "title") or ""),
                "url": url,
                "source_domain": urllib.parse.urlparse(url).netloc.strip().lower(),
                "snippet": str(_first(item, "content", "snippet", "raw_content") or "")[:500],
                "date": pub_date,
                "relevance": 0.8,
                "why_relevant": "AIsa Tavily search",
            }
        )
    artifact = {"label": "aisa-tavily", "webSearchQueries": [response.get("query", "")], "resultCount": len(items)}
    return items, artifact


def parse_polymarket_response(response: dict[str, Any], *, topic: str = "") -> list[dict[str, Any]]:
    raw_items = response.get("markets") or response.get("data") or response.get("results") or []
    items: list[dict[str, Any]] = []
    for index, item in enumerate(raw_items):
        if not isinstance(item, dict):
            continue
        question = str(_first(item, "question", "title", "name") or "").strip()
        if not question:
            continue
        url = _first(item, "url")
        slug = _first(item, "market_slug", "slug")
        market_id = _first(item, "condition_id", "id", "market_id")
        if not url:
            if slug:
                url = f"https://polymarket.com/event/{slug}"
            elif market_id:
                url = f"https://polymarket.com/event/{market_id}"
        if not url:
            continue
        items.append(
            {
                "id": f"AP{index + 1}",
                "title": question,
                "url": url,
                # end_time/start_time are Unix timestamps; _normalize_date handles them
                # via the float branch. Prefer end_time (when the market resolves) over
                # start_time so the "date" reflects recency of the prediction window.
                "date": _normalize_date(_first(item, "end_time", "updatedAt", "end_date", "date", "start_time")),
                "probability": _normalize_probability(_first(item, "probability", "yes_price", "price")),
                "volume": _to_float(_first(item, "volume_total", "volume_1_month", "volume_1_week", "volume")),
                "container": "Polymarket",
                "why_relevant": "AIsa Polymarket search",
                "relevance": _query_relevance(topic, question) if topic else 0.8,
            }
        )
    return items


def _handle_from_item(item: dict[str, Any]) -> str:
    return str(
        _first(item, "screen_name", "author_handle", "username", "user_name", "handle") or ""
    ).lstrip("@")


def _first(item: dict[str, Any], *keys: str) -> Any:
    for key in keys:
        value = item.get(key)
        if value not in (None, ""):
            return value
    return None


def _normalize_date(value: object) -> str | None:
    if value is None:
        return None
    parsed = dates.parse_date(str(value).strip())
    if not parsed:
        return None
    return parsed.date().isoformat()


def _to_int(value: Any) -> int | None:
    if value in (None, ""):
        return None
    try:
        return int(value)
    except (TypeError, ValueError):
        return None


def _to_float(value: Any) -> float | None:
    if value in (None, ""):
        return None
    try:
        return float(value)
    except (TypeError, ValueError):
        return None


def _normalize_probability(value: Any) -> float | None:
    prob = _to_float(value)
    if prob is None:
        return None
    if prob > 1:
        return prob / 100.0
    return prob


def _query_relevance(topic: str, text: str) -> float:
    topic_tokens = {token for token in topic.lower().split() if len(token) > 2}
    if not topic_tokens:
        return 0.8
    text_lower = text.lower()
    overlap = sum(1 for token in topic_tokens if token in text_lower)
    return min(1.0, max(0.4, overlap / max(1, len(topic_tokens))))


def _youtube_relevance(topic: str, title: str, snippet: str) -> float:
    text = f"{title} {snippet}".strip()
    base = _query_relevance(topic, text)
    if re_match := any(marker in text.lower() for marker in ("review", "breakdown", "reaction", "explained")):
        return min(1.0, base + 0.1)
    return base