文件预览

search_images.py

查看 Article Image Research 技能包中的文件内容。

文件内容

scripts/search_images.py

#!/usr/bin/env python3
"""Search image providers and return normalized candidate JSON."""

from __future__ import annotations

import argparse
import json
import os
import sys
import urllib.error
import urllib.parse
import urllib.request
from datetime import datetime, timezone
from typing import Any


class SearchError(RuntimeError):
    pass


def request_json(url: str, headers: dict[str, str] | None = None) -> dict[str, Any]:
    request_headers = {
        "Accept": "application/json",
        "User-Agent": "article-image-research/1.0 (image research for editorial attribution)",
    }
    request_headers.update(headers or {})
    request = urllib.request.Request(url, headers=request_headers)
    try:
        with urllib.request.urlopen(request, timeout=20) as response:
            return json.loads(response.read().decode("utf-8"))
    except urllib.error.HTTPError as exc:
        detail = exc.read().decode("utf-8", errors="replace")
        raise SearchError(f"HTTP {exc.code}: {detail}") from exc
    except urllib.error.URLError as exc:
        raise SearchError(str(exc)) from exc


def build_url(base: str, params: dict[str, Any]) -> str:
    clean = {key: value for key, value in params.items() if value not in (None, "")}
    return base + "?" + urllib.parse.urlencode(clean)


def search_openverse(query: str, limit: int, page: int, include_raw: bool) -> list[dict[str, Any]]:
    url = build_url(
        "https://api.openverse.org/v1/images/",
        {
            "q": query,
            "page_size": limit,
            "page": page,
            "mature": "false",
        },
    )
    data = request_json(url)
    results = []
    for item in data.get("results", []):
        result = {
            "provider": "openverse",
            "title": item.get("title"),
            "image_url": item.get("url"),
            "thumbnail_url": item.get("thumbnail"),
            "landing_url": item.get("foreign_landing_url"),
            "creator": item.get("creator"),
            "creator_url": item.get("creator_url"),
            "license": item.get("license"),
            "license_url": item.get("license_url"),
            "source": item.get("source") or item.get("provider"),
            "attribution": item.get("attribution"),
            "width": item.get("width"),
            "height": item.get("height"),
        }
        if include_raw:
            result["raw"] = item
        results.append(result)
    return results


def search_pexels(query: str, limit: int, page: int, include_raw: bool) -> list[dict[str, Any]]:
    api_key = os.environ.get("PEXELS_API_KEY")
    if not api_key:
        raise SearchError("PEXELS_API_KEY is required for provider=pexels")
    url = build_url(
        "https://api.pexels.com/v1/search",
        {
            "query": query,
            "per_page": limit,
            "page": page,
        },
    )
    data = request_json(url, {"Authorization": api_key})
    results = []
    for item in data.get("photos", []):
        src = item.get("src") or {}
        result = {
            "provider": "pexels",
            "title": item.get("alt"),
            "image_url": src.get("large2x") or src.get("large") or src.get("original"),
            "thumbnail_url": src.get("medium") or src.get("small"),
            "landing_url": item.get("url"),
            "creator": item.get("photographer"),
            "creator_url": item.get("photographer_url"),
            "license": "Pexels License",
            "license_url": "https://www.pexels.com/license/",
            "source": "pexels",
            "attribution": f"Photo by {item.get('photographer')} on Pexels" if item.get("photographer") else None,
            "width": item.get("width"),
            "height": item.get("height"),
        }
        if include_raw:
            result["raw"] = item
        results.append(result)
    return results


def search_unsplash(query: str, limit: int, page: int, include_raw: bool) -> list[dict[str, Any]]:
    api_key = os.environ.get("UNSPLASH_ACCESS_KEY")
    if not api_key:
        raise SearchError("UNSPLASH_ACCESS_KEY is required for provider=unsplash")
    url = build_url(
        "https://api.unsplash.com/search/photos",
        {
            "query": query,
            "per_page": limit,
            "page": page,
        },
    )
    data = request_json(url, {"Authorization": f"Client-ID {api_key}"})
    results = []
    for item in data.get("results", []):
        user = item.get("user") or {}
        urls = item.get("urls") or {}
        links = item.get("links") or {}
        creator = user.get("name")
        result = {
            "provider": "unsplash",
            "title": item.get("description") or item.get("alt_description"),
            "image_url": urls.get("regular") or urls.get("full"),
            "thumbnail_url": urls.get("thumb") or urls.get("small"),
            "landing_url": links.get("html"),
            "creator": creator,
            "creator_url": (user.get("links") or {}).get("html"),
            "license": "Unsplash License / API Terms",
            "license_url": "https://unsplash.com/license",
            "source": "unsplash",
            "attribution": f"Photo by {creator} on Unsplash" if creator else None,
            "download_location": links.get("download_location"),
            "width": item.get("width"),
            "height": item.get("height"),
        }
        if include_raw:
            result["raw"] = item
        results.append(result)
    return results


def search_brave(query: str, limit: int, page: int, include_raw: bool) -> list[dict[str, Any]]:
    api_key = os.environ.get("BRAVE_SEARCH_API_KEY")
    if not api_key:
        raise SearchError("BRAVE_SEARCH_API_KEY is required for provider=brave")
    offset = max(page - 1, 0) * limit
    url = build_url(
        "https://api.search.brave.com/res/v1/images/search",
        {
            "q": query,
            "count": limit,
            "offset": offset,
            "safesearch": "strict",
            "spellcheck": 1,
        },
    )
    data = request_json(
        url,
        {
            "Accept": "application/json",
            "X-Subscription-Token": api_key,
        },
    )
    results = []
    for item in data.get("results", []):
        image = item.get("properties") or {}
        thumbnail = item.get("thumbnail") or {}
        result = {
            "provider": "brave",
            "title": item.get("title"),
            "image_url": image.get("url") or item.get("url"),
            "thumbnail_url": thumbnail.get("src"),
            "landing_url": item.get("page_url") or item.get("url"),
            "creator": None,
            "creator_url": None,
            "license": None,
            "license_url": None,
            "source": item.get("source"),
            "attribution": None,
            "width": image.get("width"),
            "height": image.get("height"),
        }
        if include_raw:
            result["raw"] = item
        results.append(result)
    return results


PROVIDERS = {
    "openverse": search_openverse,
    "pexels": search_pexels,
    "unsplash": search_unsplash,
    "brave": search_brave,
}


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="Search image providers and output normalized JSON.")
    parser.add_argument("--query", required=True, help="Search query.")
    parser.add_argument(
        "--provider",
        action="append",
        choices=sorted(PROVIDERS),
        help="Provider to search. Repeat for multiple providers. Defaults to openverse.",
    )
    parser.add_argument("--limit", type=int, default=10, help="Results per provider.")
    parser.add_argument("--page", type=int, default=1, help="Provider result page.")
    parser.add_argument("--include-raw", action="store_true", help="Include raw provider payloads.")
    parser.add_argument("--pretty", action="store_true", help="Pretty-print JSON.")
    return parser.parse_args()


def main() -> int:
    args = parse_args()
    providers = args.provider or ["openverse"]
    output: dict[str, Any] = {
        "query": args.query,
        "retrieved_at": datetime.now(timezone.utc).isoformat(),
        "providers": providers,
        "results": [],
        "errors": [],
    }

    for provider in providers:
        try:
            output["results"].extend(PROVIDERS[provider](args.query, args.limit, args.page, args.include_raw))
        except SearchError as exc:
            output["errors"].append({"provider": provider, "error": str(exc)})

    json.dump(output, sys.stdout, ensure_ascii=False, indent=2 if args.pretty else None)
    sys.stdout.write("\n")
    return 1 if output["errors"] and not output["results"] else 0


if __name__ == "__main__":
    raise SystemExit(main())