文件预览

submit_amazon_global_product.py

查看 Dataify Amazon Global Product 技能包中的文件内容。

文件内容

scripts/submit_amazon_global_product.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import json
import os
import sys
import urllib.error
import urllib.parse
import urllib.request


DEFAULT_FILE_NAME = "{{TasksID}}"
MIN_PYTHON = (3, 6)
DATAIFY_URL = "https://dataify.com"
DASHBOARD_URL = "https://dataify.com/dashboard/"
DEFAULT_PRODUCT_URL = "https://www.amazon.com/dp/B0CHHSFMRL/"
DEFAULT_CATEGORY_URL = "https://www.amazon.com/s?i=luggage-intl-ship"
DEFAULT_DOMAIN = "https://www.amazon.com"
BUILDER_URLS = {
    "product-url": "https://scraperapi.dataify.com/builder",
    "category-url": "https://scraperapi.dataify.com/builder?platform=1",
    "keyword": "https://scraperapi.dataify.com/builder?platform=1",
    "keyword-brand": "https://scraperapi.dataify.com/builder?platform=1",
}
SPIDER_IDS = {
    "product-url": "amazon_global-product_by-url",
    "category-url": "amazon_global-product_by-category-url",
    "keyword": "amazon_global-product_by-keywords",
    "keyword-brand": "amazon_global-product_by-keywords-brand",
}


def ensure_utf8_output():
    if hasattr(sys.stdout, "reconfigure"):
        sys.stdout.reconfigure(encoding="utf-8")
    if hasattr(sys.stderr, "reconfigure"):
        sys.stderr.reconfigure(encoding="utf-8")


def ensure_python_version():
    if sys.version_info < MIN_PYTHON:
        print(
            "Python {}.{} or newer is required. Run this script with a Python 3 interpreter, for example: python3 scripts/submit_amazon_global_product.py product-url".format(
                MIN_PYTHON[0],
                MIN_PYTHON[1],
            ),
            file=sys.stderr,
        )
        return False
    return True


def require_token(api_token):
    if api_token:
        return True
    print("Missing Dataify API TOKEN. Get one from {}.".format(DATAIFY_URL), file=sys.stderr)
    return False


def normalize_file_name(value):
    file_name = value.strip() if value else ""
    if not file_name:
        raise ValueError("File name cannot be empty.")
    return file_name


def normalize_sort_by(value):
    options = {
        "best sellers": "Best Sellers",
        "Best Sellers": "Best Sellers",
        "newest arrivals": "Newest Arrivals",
        "Newest Arrivals": "Newest Arrivals",
        "average customer review": "Avg. Customer Review",
        "Avg. Customer Review": "Avg. Customer Review",
        "price high to low": "Price: High to Low",
        "Price: High to Low": "Price: High to Low",
        "price low to high": "Price: Low to High",
        "Price: Low to High": "Price: Low to High",
        "featured recommendations": "Featured",
        "Featured": "Featured",
    }
    clean = value.strip() if value else ""
    if clean in options:
        return options[clean]
    raise ValueError("Unsupported sort_by '{}'. Use one of: Best Sellers, Newest Arrivals, Avg. Customer Review, Price: High to Low, Price: Low to High, Featured.".format(value))


def normalize_get_sponsored(value):
    clean = value.strip().lower() if value else ""
    if clean in ("true", "false"):
        return clean
    raise ValueError("get_sponsored must be true or false.")


def submit_builder(builder_url, spider_id, api_token, spider_parameters, file_name):
    form = {
        "spider_name": "amazon.com",
        "spider_id": spider_id,
        "spider_parameters": json.dumps(spider_parameters, separators=(",", ":"), ensure_ascii=False),
        "spider_errors": "true",
        "file_name": file_name,
    }
    body = urllib.parse.urlencode(form).encode("utf-8")
    request = urllib.request.Request(
        builder_url,
        data=body,
        headers={
            "Authorization": "Bearer {}".format(api_token),
            "Content-Type": "application/x-www-form-urlencoded",
        },
        method="POST",
    )

    try:
        with urllib.request.urlopen(request, timeout=60) as response:
            raw = response.read().decode("utf-8")
    except urllib.error.HTTPError as exc:
        detail = exc.read().decode("utf-8", errors="replace")
        raise RuntimeError("Builder request failed with HTTP {}: {}".format(exc.code, detail))
    except urllib.error.URLError as exc:
        raise RuntimeError("Builder request failed: {}".format(exc.reason))

    try:
        payload = json.loads(raw)
    except json.JSONDecodeError:
        raise RuntimeError("Builder returned non-JSON response: {}".format(raw))

    task_id = payload.get("data", {}).get("task_id")
    if not task_id:
        raise RuntimeError("Builder did not return task_id. Response: {}".format(json.dumps(payload, ensure_ascii=False)))
    return task_id


def add_common_args(parser):
    parser.add_argument("--api-token", default=os.environ.get("DATAIFY_API_TOKEN"), help="Dataify token. Defaults to DATAIFY_API_TOKEN.")
    parser.add_argument("--file-name", default=DEFAULT_FILE_NAME, help="Builder file_name value. Defaults to {{TasksID}}.")


def handle_product_url(args):
    url = args.url.strip()
    if not url:
        raise ValueError("URL cannot be empty.")
    file_name = normalize_file_name(args.file_name)
    parameters = [{"url": url}]
    summary = {"url": url, "file_name": file_name}
    return BUILDER_URLS["product-url"], SPIDER_IDS["product-url"], parameters, file_name, summary


def handle_category_url(args):
    url = args.url.strip()
    if not url:
        raise ValueError("URL cannot be empty.")
    if args.maximum < 0:
        raise ValueError("Maximum must be greater than or equal to 0.")
    sort_by = normalize_sort_by(args.sort_by)
    get_sponsored = normalize_get_sponsored(args.get_sponsored)
    file_name = normalize_file_name(args.file_name)
    parameters = [{
        "url": url,
        "maximum": str(args.maximum),
        "sort_by": sort_by,
        "get_sponsored": get_sponsored,
    }]
    summary = {
        "url": url,
        "maximum": str(args.maximum),
        "sort_by": sort_by,
        "get_sponsored": get_sponsored,
        "file_name": file_name,
    }
    return BUILDER_URLS["category-url"], SPIDER_IDS["category-url"], parameters, file_name, summary


def handle_keyword(args):
    keyword = args.keyword.strip()
    if not keyword:
        raise ValueError("Keyword cannot be empty.")
    domain = args.domain.strip()
    if not domain:
        raise ValueError("Domain cannot be empty.")
    if args.lowest_price < 0:
        raise ValueError("Lowest price must be greater than or equal to 0.")
    if args.highest_price < 0:
        raise ValueError("Highest price must be greater than or equal to 0.")
    if args.highest_price < args.lowest_price:
        raise ValueError("Highest price cannot be less than lowest price.")
    if args.page_turning < 0:
        raise ValueError("Page turning must be greater than or equal to 0.")
    file_name = normalize_file_name(args.file_name)
    parameters = [{
        "keyword": keyword,
        "domain": domain,
        "lowest_price": str(args.lowest_price),
        "highest_price": str(args.highest_price),
        "page_turning": str(args.page_turning),
    }]
    summary = {
        "keyword": keyword,
        "domain": domain,
        "lowest_price": str(args.lowest_price),
        "highest_price": str(args.highest_price),
        "page_turning": str(args.page_turning),
        "file_name": file_name,
    }
    return BUILDER_URLS["keyword"], SPIDER_IDS["keyword"], parameters, file_name, summary


def handle_keyword_brand(args):
    keyword = args.keyword.strip()
    if not keyword:
        raise ValueError("Keyword cannot be empty.")
    brands = args.brands.strip()
    if not brands:
        raise ValueError("Brands cannot be empty.")
    if args.page_turning < 0:
        raise ValueError("Page turning must be greater than or equal to 0.")
    file_name = normalize_file_name(args.file_name)
    parameters = [{
        "keyword": keyword,
        "brands": brands,
        "page_turning": str(args.page_turning),
    }]
    summary = {
        "keyword": keyword,
        "brands": brands,
        "page_turning": str(args.page_turning),
        "file_name": file_name,
    }
    return BUILDER_URLS["keyword-brand"], SPIDER_IDS["keyword-brand"], parameters, file_name, summary


def build_parser():
    parser = argparse.ArgumentParser(description="Submit Dataify Amazon global product Builder tasks.")
    subparsers = parser.add_subparsers(dest="mode")
    subparsers.required = True

    product_url = subparsers.add_parser("product-url", help="Collect global product details by product URL.")
    product_url.add_argument("--url", default=DEFAULT_PRODUCT_URL, help="Amazon product URL.")
    add_common_args(product_url)
    product_url.set_defaults(handler=handle_product_url)

    category_url = subparsers.add_parser("category-url", help="Collect global product details from a category URL.")
    category_url.add_argument("--url", default=DEFAULT_CATEGORY_URL, help="Amazon category URL.")
    category_url.add_argument("--maximum", default=5, type=int, help="Maximum number of products to collect. Defaults to 5.")
    category_url.add_argument("--sort-by", default="Best Sellers", help="Sort option. Defaults to Best Sellers.")
    category_url.add_argument("--get-sponsored", default="true", help="Whether to include sponsored products: true or false. Defaults to true.")
    add_common_args(category_url)
    category_url.set_defaults(handler=handle_category_url)

    keyword = subparsers.add_parser("keyword", help="Collect global product details from a keyword search.")
    keyword.add_argument("--keyword", default="coffee", help="Amazon search keyword. Defaults to coffee.")
    keyword.add_argument("--domain", default=DEFAULT_DOMAIN, help="Amazon domain. Defaults to https://www.amazon.com.")
    keyword.add_argument("--lowest-price", default=20, type=int, help="Lowest price. Defaults to 20.")
    keyword.add_argument("--highest-price", default=50, type=int, help="Highest price. Defaults to 50.")
    keyword.add_argument("--page-turning", default=2, type=int, help="Number of pages to collect. Defaults to 2.")
    add_common_args(keyword)
    keyword.set_defaults(handler=handle_keyword)

    keyword_brand = subparsers.add_parser("keyword-brand", help="Collect global product details from a keyword and brand filter.")
    keyword_brand.add_argument("--keyword", default="shirts", help="Amazon search keyword. Defaults to shirts.")
    keyword_brand.add_argument("--brands", default="Adidas", help="Brand filter. Defaults to Adidas.")
    keyword_brand.add_argument("--page-turning", default=2, type=int, help="Number of pages to collect. Defaults to 2.")
    add_common_args(keyword_brand)
    keyword_brand.set_defaults(handler=handle_keyword_brand)

    return parser


def main():
    ensure_utf8_output()
    if not ensure_python_version():
        return 2

    parser = build_parser()
    args = parser.parse_args()

    if not require_token(args.api_token):
        return 2

    try:
        builder_url, spider_id, spider_parameters, file_name, summary = args.handler(args)
        task_id = submit_builder(builder_url, spider_id, args.api_token, spider_parameters, file_name)
    except ValueError as exc:
        print(str(exc), file=sys.stderr)
        return 2
    except RuntimeError as exc:
        print(str(exc), file=sys.stderr)
        return 1

    output = {
        "task_id": task_id,
        "mode": args.mode,
        "spider_id": spider_id,
        "dashboard_url": DASHBOARD_URL,
        "message": "Task submitted. Visit {} to view results.".format(DASHBOARD_URL),
    }
    output.update(summary)
    print(json.dumps(output, ensure_ascii=False, indent=2))
    return 0


if __name__ == "__main__":
    sys.exit(main())