文件预览

delivery.py

查看 Audible Goodreads Deal Scout 技能包中的文件内容。

文件内容

audible_goodreads_deal_scout/delivery.py

from __future__ import annotations

import json
import os
import shlex
import shutil
import subprocess
from pathlib import Path
from typing import Any

from .constants import DEFAULT_DELIVERY_POLICY, DEFAULT_FRESHNESS_DAYS, DEFAULT_THRESHOLD, SUPPORTED_DELIVERY_POLICIES, SUPPORTED_PRIVACY_MODES
from .settings import (
    config_template,
    default_storage_dir,
    load_config,
    resolve_notes_text,
    skill_root,
    validate_marketplace,
    validate_timezone,
)
from .shared import atomic_write_text, ensure_python_version, normalize_space, write_json_atomic


def resolve_openclaw_bin(openclaw_bin: str = "openclaw") -> str:
    env_bin = normalize_space(os.environ.get("OPENCLAW_BIN"))
    requested = normalize_space(openclaw_bin) or "openclaw"
    candidates: list[str] = []
    if env_bin and requested == "openclaw":
        candidates.append(env_bin)
    candidates.append(requested)
    home = Path.home()
    candidates.extend(
        [
            str(home / ".npm-global" / "bin" / "openclaw"),
            str(home / ".local" / "bin" / "openclaw"),
            "/usr/local/bin/openclaw",
        ]
    )
    for candidate in candidates:
        if not candidate:
            continue
        expanded = str(Path(candidate).expanduser()) if "/" in candidate else candidate
        if "/" in expanded:
            if Path(expanded).exists() and os.access(expanded, os.X_OK):
                return expanded
            continue
        resolved = shutil.which(expanded)
        if resolved:
            return resolved
    return requested


def build_cron_message(config_path: Path, state_file: Path) -> str:
    return (
        "Use $audible-goodreads-deal-scout to evaluate the current Audible daily promotion "
        f"with config at {config_path} in scheduled mode using state file {state_file}."
    )


def build_cron_command(
    *,
    openclaw_bin: str,
    spec: dict[str, str],
    config_path: Path,
    state_file: Path,
    name: str | None = None,
    cron_expr: str | None = None,
) -> list[str]:
    validate_timezone(spec)
    resolved_openclaw_bin = resolve_openclaw_bin(openclaw_bin)
    return [
        resolved_openclaw_bin,
        "--no-color",
        "cron",
        "add",
        "--name",
        name or f"Audible Goodreads Deal ({spec['key'].upper()})",
        "--cron",
        cron_expr or spec["defaultCron"],
        "--tz",
        spec["timezone"],
        "--session",
        "isolated",
        "--message",
        build_cron_message(config_path, state_file),
        "--announce",
        "--json",
    ]


def list_cron_jobs(openclaw_bin: str) -> list[dict[str, Any]]:
    resolved_openclaw_bin = resolve_openclaw_bin(openclaw_bin)
    proc = subprocess.run(
        [resolved_openclaw_bin, "--no-color", "cron", "list", "--json"],
        capture_output=True,
        text=True,
        timeout=30,
    )
    if proc.returncode != 0:
        raise RuntimeError(proc.stderr.strip() or proc.stdout.strip() or "openclaw cron list failed")
    try:
        payload = json.loads(proc.stdout.strip() or "{}")
    except Exception:
        return []
    if isinstance(payload, list):
        return [item for item in payload if isinstance(item, dict)]
    if isinstance(payload, dict):
        jobs = payload.get("jobs")
        if isinstance(jobs, list):
            return [item for item in jobs if isinstance(item, dict)]
    return []


def find_matching_cron_job(
    jobs: list[dict[str, Any]],
    *,
    name: str,
    cron_expr: str,
    timezone_name: str,
    message: str,
) -> dict[str, Any] | None:
    for job in jobs:
        job_name = normalize_space(str(job.get("name") or ""))
        schedule = job.get("schedule") if isinstance(job.get("schedule"), dict) else {}
        payload = job.get("payload") if isinstance(job.get("payload"), dict) else {}
        job_cron = normalize_space(str(schedule.get("cron") or schedule.get("expr") or ""))
        if (
            job_name == name
            and job_cron == cron_expr
            and normalize_space(str(schedule.get("tz") or "")) == timezone_name
            and normalize_space(str(payload.get("message") or payload.get("text") or "")) == message
        ):
            return job
    return None


def register_cron_job(
    *,
    openclaw_bin: str,
    spec: dict[str, str],
    config_path: Path,
    state_file: Path,
    name: str | None = None,
    cron_expr: str | None = None,
) -> dict[str, Any]:
    job_name = name or f"Audible Goodreads Deal ({spec['key'].upper()})"
    schedule = cron_expr or spec["defaultCron"]
    message = build_cron_message(config_path, state_file)
    jobs = list_cron_jobs(openclaw_bin)
    existing = find_matching_cron_job(
        jobs,
        name=job_name,
        cron_expr=schedule,
        timezone_name=spec["timezone"],
        message=message,
    )
    command = build_cron_command(
        openclaw_bin=openclaw_bin,
        spec=spec,
        config_path=config_path,
        state_file=state_file,
        name=job_name,
        cron_expr=schedule,
    )
    if existing:
        return {"ok": True, "created": False, "existingJob": existing, "command": command}
    proc = subprocess.run(command, capture_output=True, text=True, timeout=30)
    if proc.returncode != 0:
        raise RuntimeError(proc.stderr.strip() or proc.stdout.strip() or "openclaw cron add failed")
    payload = json.loads(proc.stdout.strip() or "{}")
    return {"ok": True, "created": True, "job": payload.get("job"), "command": command}


def _next_step(label: str, description: str, argv: list[str], *, optional: bool = False) -> dict[str, Any]:
    return {
        "label": label,
        "description": description,
        "optional": optional,
        "argv": argv,
        "command": shlex.join(argv),
    }


def build_setup_next_steps(
    *,
    config_path: Path,
    storage_dir: Path,
    spec: dict[str, str],
    config_payload: dict[str, Any],
) -> list[dict[str, Any]]:
    wrapper = str(skill_root() / "scripts" / "audible-goodreads-deal-scout.sh")
    launcher = ["sh", wrapper]
    config_arg = str(config_path)
    steps = [
        _next_step(
            "doctor",
            "Validate config, local files, wrapper, OpenClaw binary, delivery, cron, and auth readiness.",
            [*launcher, "doctor", "--config-path", config_arg],
        ),
        _next_step(
            "check-daily-deal",
            "Prepare today's Audible daily promotion result for the OpenClaw skill runtime.",
            [*launcher, "prepare", "--config-path", config_arg],
        ),
    ]
    if config_payload.get("goodreadsCsvPath"):
        steps.append(
            _next_step(
                "scan-want-to-read",
                "Run a small Want-to-Read discount scan to verify Goodreads CSV and Audible matching.",
                [*launcher, "scan-want-to-read", "--config-path", config_arg, "--limit", "40"],
            )
        )
    auth_path = normalize_space(str(config_payload.get("audibleAuthPath") or ""))
    if auth_path:
        steps.append(
            _next_step(
                "check-audible-auth",
                "Check saved Audible auth readiness and file permissions without printing tokens.",
                [*launcher, "audible-auth-status", "--auth-path", auth_path],
                optional=True,
            )
        )
    else:
        suggested_auth_path = str(storage_dir / "audible-auth.json")
        steps.append(
            _next_step(
                "optional-audible-auth",
                "Optional: start external-browser Audible auth for member-visible Want-to-Read prices.",
                [*launcher, "audible-auth-start", "--auth-path", suggested_auth_path, "--audible-marketplace", spec["key"]],
                optional=True,
            )
        )
    return steps


def setup_configuration(
    options: dict[str, Any],
    *,
    openclaw_bin: str = "openclaw",
    register_cron: bool = False,
) -> dict[str, Any]:
    ensure_python_version()
    marketplace = normalize_space(str(options.get("audibleMarketplace") or "us")).lower() or "us"
    spec = validate_marketplace(marketplace)
    storage_dir = Path(str(options.get("storageDir") or default_storage_dir())).expanduser()
    config_path = Path(str(options.get("configPath") or storage_dir / "config.json")).expanduser()
    state_file = Path(str(options.get("stateFile") or storage_dir / "state.json")).expanduser()
    preferences_path = Path(str(options.get("preferencesPath") or storage_dir / "preferences.md")).expanduser()
    threshold = float(options.get("threshold") or DEFAULT_THRESHOLD)
    privacy_mode = normalize_space(str(options.get("privacyMode") or "normal")).lower() or "normal"
    notes_file = normalize_space(str(options.get("notesFile") or ""))
    notes_text = resolve_notes_text(notes_file, str(options.get("notesText") or ""))
    goodreads_csv = normalize_space(str(options.get("goodreadsCsvPath") or ""))
    daily_enabled = bool(options.get("dailyAutomation"))
    cron_expr = normalize_space(str(options.get("dailyCron") or spec["defaultCron"]))
    artifact_dir = Path(str(options.get("artifactDir") or storage_dir / "artifacts" / "current")).expanduser()
    delivery_channel = normalize_space(str(options.get("deliveryChannel") or ""))
    delivery_target = normalize_space(str(options.get("deliveryTarget") or ""))
    delivery_policy = normalize_delivery_policy(str(options.get("deliveryPolicy") or DEFAULT_DELIVERY_POLICY))
    if notes_text:
        notes_text = notes_text.rstrip() + "\n"
    config_payload = config_template(
        audibleMarketplace=spec["key"],
        threshold=threshold,
        goodreadsCsvPath=goodreads_csv or None,
        preferencesPath=str(preferences_path) if notes_text else None,
        privacyMode=privacy_mode if privacy_mode in SUPPORTED_PRIVACY_MODES else "normal",
        stateFile=str(state_file) if daily_enabled else None,
        artifactDir=str(artifact_dir),
        freshnessDays=int(options.get("freshnessDays") or DEFAULT_FRESHNESS_DAYS),
        csvColumns=options.get("csvColumns") or {},
        audibleDealUrl=options.get("audibleDealUrl") or None,
        audibleFetchBackend=options.get("audibleFetchBackend") or "auto",
        audibleAuthPath=options.get("audibleAuthPath") or None,
        dailyCron=cron_expr if daily_enabled else None,
        deliveryChannel=delivery_channel or None,
        deliveryTarget=delivery_target or None,
        deliveryPolicy=delivery_policy,
    )
    cron_command = None
    if daily_enabled:
        cron_command = build_cron_command(
            openclaw_bin=openclaw_bin,
            spec=spec,
            config_path=config_path,
            state_file=state_file,
            cron_expr=cron_expr,
        )

    manual_result = {
        "ok": True,
        "written": False,
        "configPath": str(config_path),
        "preferencesPath": str(preferences_path) if notes_text else None,
        "stateFile": str(state_file) if daily_enabled else None,
        "config": config_payload,
        "configJson": json.dumps(config_payload, indent=2, sort_keys=True, ensure_ascii=False),
        "cronCommand": cron_command,
        "marketplace": spec["key"],
        "nextSteps": build_setup_next_steps(
            config_path=config_path,
            storage_dir=storage_dir,
            spec=spec,
            config_payload=config_payload,
        ),
    }

    try:
        config_path.parent.mkdir(parents=True, exist_ok=True)
        write_json_atomic(config_path, config_payload)
        if notes_text:
            atomic_write_text(preferences_path, notes_text)
    except OSError:
        return {**manual_result, "manualOnly": True}

    result = {**manual_result, "written": True, "manualOnly": False}
    if daily_enabled and register_cron:
        result["cronRegistration"] = register_cron_job(
            openclaw_bin=openclaw_bin,
            spec=spec,
            config_path=config_path,
            state_file=state_file,
            cron_expr=cron_expr,
        )
    return result


def resolve_delivery_settings(
    *,
    config_path: Path | None,
    delivery_channel: str | None = None,
    delivery_target: str | None = None,
) -> tuple[Path, str, str, str]:
    path, config = load_config(config_path)
    channel = normalize_space(str(delivery_channel or config.get("deliveryChannel") or ""))
    target = normalize_space(str(delivery_target or config.get("deliveryTarget") or ""))
    policy = normalize_delivery_policy(str(config.get("deliveryPolicy") or DEFAULT_DELIVERY_POLICY))
    if not channel:
        raise RuntimeError(
            f"No delivery channel configured. Set deliveryChannel in {path} or pass --delivery-channel."
        )
    if not target:
        raise RuntimeError(
            f"No delivery target configured. Set deliveryTarget in {path} or pass --delivery-target."
        )
    return path, channel, target, policy


def resolve_delivery_policy(
    *,
    config_path: Path | None,
    delivery_policy: str | None = None,
) -> tuple[Path, str]:
    path, config = load_config(config_path)
    policy = normalize_delivery_policy(delivery_policy or str(config.get("deliveryPolicy") or DEFAULT_DELIVERY_POLICY))
    return path, policy


def normalize_delivery_policy(value: str | None) -> str:
    normalized = normalize_space(str(value or "")).lower() or DEFAULT_DELIVERY_POLICY
    if normalized not in SUPPORTED_DELIVERY_POLICIES:
        return DEFAULT_DELIVERY_POLICY
    return normalized


def deliver_message(
    *,
    message_text: str,
    config_path: Path | None,
    delivery_channel: str | None = None,
    delivery_target: str | None = None,
    openclaw_bin: str = "openclaw",
    dry_run: bool = False,
) -> dict[str, Any]:
    path, channel, target, policy = resolve_delivery_settings(
        config_path=config_path,
        delivery_channel=delivery_channel,
        delivery_target=delivery_target,
    )
    normalized_message = str(message_text or "").strip()
    if not normalized_message:
        raise RuntimeError("Cannot deliver an empty message.")
    command = [
        resolve_openclaw_bin(openclaw_bin),
        "message",
        "send",
        "--channel",
        channel,
        "--target",
        target,
        "--message",
        normalized_message,
        "--json",
    ]
    if dry_run:
        command.insert(-1, "--dry-run")
    proc = subprocess.run(command, capture_output=True, text=True, timeout=60)
    stdout = (proc.stdout or "").strip()
    stderr = (proc.stderr or "").strip()
    if proc.returncode != 0:
        raise RuntimeError(stderr or stdout or "openclaw message send failed")
    payload = json.loads(stdout or "{}")
    return {
        "ok": True,
        "configPath": str(path),
        "deliveryChannel": channel,
        "deliveryTarget": target,
        "deliveryPolicy": policy,
        "dryRun": dry_run,
        "payload": payload.get("payload") if isinstance(payload, dict) else payload,
        "raw": payload,
    }