文件预览

cli.py

查看 Dyson Fan Control 技能包中的文件内容。

文件内容

src/dyson_cli/cli.py

"""Main CLI entry point for dyson-cli."""

import json
import sys
import time
from typing import Optional

import click
from rich.console import Console
from rich.table import Table

from .config import (
    CONFIG_FILE,
    get_device,
    load_config,
    save_config,
    set_default_device,
)

console = Console()

# Device type mapping (from libdyson)
DEVICE_TYPE_NAMES = {
    "455": "Dyson Pure Hot+Cool Link",
    "469": "Dyson Pure Cool Link Desk",
    "475": "Dyson Pure Cool Link Tower",
    "520": "Dyson Pure Cool Desk",
    "527": "Dyson Pure Hot+Cool",
    "527K": "Dyson Purifier Hot+Cool Formaldehyde (HP09)",
    "438": "Dyson Pure Cool Tower",
    "358": "Dyson Pure Humidify+Cool",
    "358E": "Dyson Pure Humidify+Cool Formaldehyde",
    "527E": "Dyson Purifier Hot+Cool Formaldehyde",
    "664": "Dyson Purifier Big+Quiet Formaldehyde",
}


def get_device_type_name(product_type: str) -> str:
    """Get human-readable device type name."""
    return DEVICE_TYPE_NAMES.get(product_type, f"Dyson Device ({product_type})")


@click.group()
@click.version_option()
def cli():
    """Control Dyson devices from the command line."""
    pass


@cli.command()
@click.option("--email", prompt="Dyson account email", help="Your Dyson account email")
@click.option(
    "--region",
    type=click.Choice(["US", "CA", "CN", "GB", "AU", "DE", "FR", "IT", "ES", "NL", "IE"]),
    default="GB",
    help="Dyson account region (country code)",
)
def setup(email: str, region: str):
    """Set up device credentials via Dyson account."""
    try:
        from libdyson.cloud.account import DysonAccount
        from libdyson.exceptions import DysonLoginFailure, DysonServerError
    except ImportError:
        console.print("[red]Error: libdyson not installed. Run: pip install libdyson[/red]")
        sys.exit(1)

    account = DysonAccount()

    console.print(f"Sending OTP to {email}...")
    try:
        verify_func = account.login_email_otp(email, region)
    except DysonServerError as e:
        console.print(f"[red]Server error. Try a different region (e.g., GB, US, DE)[/red]")
        sys.exit(1)
    except DysonLoginFailure as e:
        console.print(f"[red]Login failed: {e}[/red]")
        sys.exit(1)

    console.print("[green]OTP sent! Check your email.[/green]")
    otp = click.prompt("Enter the OTP code from your email")
    password = click.prompt("Enter your Dyson account password", hide_input=True)

    console.print("Verifying...")
    try:
        verify_func(otp, password)
    except DysonLoginFailure as e:
        console.print(f"[red]Verification failed: {e}[/red]")
        sys.exit(1)
    except Exception as e:
        console.print(f"[red]Error: {e}[/red]")
        sys.exit(1)

    console.print("Fetching devices...")
    devices = account.devices()

    if not devices:
        console.print("[yellow]No devices found in your Dyson account.[/yellow]")
        sys.exit(0)

    config = load_config()
    config["devices"] = []

    for device in devices:
        device_info = {
            "name": device.name,
            "serial": device.serial,
            "credential": device.credential,
            "product_type": device.product_type,
        }
        config["devices"].append(device_info)
        console.print(
            f"  Found: {device.name} ({get_device_type_name(device.product_type)})"
        )

    if config["devices"] and not config.get("default_device"):
        config["default_device"] = config["devices"][0]["name"]

    save_config(config)
    console.print(f"\n[green]✓ Saved {len(devices)} device(s) to {CONFIG_FILE}[/green]")


@cli.command("list")
@click.option("--check", "-c", is_flag=True, help="Check if devices are reachable")
def list_devices(check: bool):
    """List configured devices."""
    config = load_config()
    devices = config.get("devices", [])

    if not devices:
        console.print("[yellow]No devices configured. Run 'dyson setup' first.[/yellow]")
        return

    table = Table(title="Configured Devices")
    table.add_column("Name", style="cyan")
    table.add_column("Type", style="green")
    table.add_column("IP", style="dim")
    table.add_column("Default", style="yellow")
    if check:
        table.add_column("Status", style="green")

    default = config.get("default_device")
    for device in devices:
        is_default = "✓" if device.get("name") == default else ""
        ip = device.get("ip", "Not configured")
        
        status = None
        if check and device.get("ip"):
            import socket
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(2)
            try:
                sock.connect((device["ip"], 1883))
                status = "[green]Online[/green]"
            except:
                status = "[red]Offline[/red]"
            finally:
                sock.close()
        
        row = [
            device.get("name", "Unknown"),
            get_device_type_name(device.get("product_type", "")),
            ip,
            is_default,
        ]
        if check:
            row.append(status or "[dim]Skipped[/dim]")
        table.add_row(*row)

    console.print(table)


@cli.command()
@click.option("--device", "-d", help="Device name or serial")
@click.option("--json", "as_json", is_flag=True, help="Output as JSON")
def status(device: Optional[str], as_json: bool):
    """Show device status."""
    device_config = get_device(device)
    if not device_config:
        console.print("[red]No device found. Run 'dyson setup' first.[/red]")
        sys.exit(1)

    try:
        from libdyson import get_device as libdyson_get_device
        from libdyson import DEVICE_TYPE_PURE_HOT_COOL, DEVICE_TYPE_PURE_HOT_COOL_LINK
    except ImportError:
        console.print("[red]Error: libdyson not installed.[/red]")
        sys.exit(1)

    dyson_device = libdyson_get_device(
        device_config["serial"],
        device_config["credential"],
        device_config["product_type"],
    )

    ip = device_config.get("ip")
    if not ip:
        console.print("[yellow]No IP address configured. Trying auto-discovery...[/yellow]")
        try:
            from libdyson.discovery import DysonDiscovery

            discovery = DysonDiscovery()
            discovery.start_discovery()
            time.sleep(5)
            discovery.stop_discovery()

            discovered = discovery.devices
            for serial, info in discovered.items():
                if serial == device_config["serial"]:
                    ip = info.address
                    device_config["ip"] = ip
                    config = load_config()
                    for d in config["devices"]:
                        if d["serial"] == serial:
                            d["ip"] = ip
                    save_config(config)
                    console.print(f"[green]Discovered device at {ip}[/green]")
                    break
        except Exception as e:
            console.print(f"[red]Discovery failed: {e}[/red]")

    if not ip:
        console.print("[red]Could not find device IP. Please add 'ip' to config manually.[/red]")
        sys.exit(1)

    console.print(f"Connecting to {device_config['name']} at {ip}...")

    try:
        dyson_device.connect(ip)
        time.sleep(2)  # Wait for state update

        # Raw state for JSON output
        raw_state = {
            "name": device_config["name"],
            "serial": device_config["serial"],
            "type": get_device_type_name(device_config["product_type"]),
            "connected": dyson_device.is_connected,
            "is_on": dyson_device.is_on if hasattr(dyson_device, "is_on") else None,
            "auto_mode": getattr(dyson_device, "auto_mode", None),
            "speed": getattr(dyson_device, "speed", None),
            "oscillation": getattr(dyson_device, "oscillation", None),
            "oscillation_angle_low": getattr(dyson_device, "oscillation_angle_low", None),
            "oscillation_angle_high": getattr(dyson_device, "oscillation_angle_high", None),
            "night_mode": getattr(dyson_device, "night_mode", None),
            "heat_mode_is_on": getattr(dyson_device, "heat_mode_is_on", None),
            "heat_target": getattr(dyson_device, "heat_target", None),
            "temperature": getattr(dyson_device, "temperature", None),
            "humidity": getattr(dyson_device, "humidity", None),
        }

        dyson_device.disconnect()

        if as_json:
            console.print(json.dumps(raw_state, indent=2))
        else:
            table = Table(title=f"{device_config['name']}")
            table.add_column("", style="cyan")
            table.add_column("", style="green")

            # Connected
            connected = "[green]✓[/green]" if raw_state["connected"] else "[red]✗[/red]"
            table.add_row("Connected", connected)

            # Fan speed
            if raw_state.get("auto_mode"):
                fan_display = "Auto"
            elif raw_state.get("speed") is not None:
                fan_display = str(raw_state["speed"])
            else:
                fan_display = "[dim]Off[/dim]"
            table.add_row("Fan Speed", fan_display)

            # Oscillation
            if raw_state.get("oscillation"):
                angle_low = raw_state.get("oscillation_angle_low", 0)
                angle_high = raw_state.get("oscillation_angle_high", 0)
                angle_range = angle_high - angle_low
                osc_display = f"{angle_range}° ({angle_low}°–{angle_high}°)"
            else:
                osc_display = "[dim]Off[/dim]"
            table.add_row("Oscillation", osc_display)

            # Heat (Hot+Cool models)
            if raw_state.get("heat_mode_is_on") is not None:
                if raw_state["heat_mode_is_on"]:
                    target_k = raw_state.get("heat_target", 293)
                    target_c = target_k - 273
                    heat_display = f"On → {target_c:.0f}°C"
                else:
                    heat_display = "[dim]Off[/dim]"
                table.add_row("Heat", heat_display)

            # Environment
            if raw_state.get("temperature") is not None:
                temp_c = raw_state["temperature"] - 273
                table.add_row("Temperature", f"{temp_c:.1f}°C")
            
            if raw_state.get("humidity") is not None:
                table.add_row("Humidity", f"{raw_state['humidity']}%")

            # Night mode (quieter + dims display)
            night = "[green]✓[/green]" if raw_state.get("night_mode") else "[dim]Off[/dim]"
            table.add_row("Night Mode", night)

            console.print(table)

    except Exception as e:
        console.print(f"[red]Connection failed: {e}[/red]")
        sys.exit(1)


@cli.command()
@click.option("--device", "-d", help="Device name or serial")
def on(device: Optional[str]):
    """Turn device on."""
    _control_power(device, True)


@cli.command()
@click.option("--device", "-d", help="Device name or serial")
def off(device: Optional[str]):
    """Turn device off."""
    _control_power(device, False)


def _control_power(device_name: Optional[str], power_on: bool):
    """Control device power."""
    device_config = get_device(device_name)
    if not device_config:
        console.print("[red]No device found.[/red]")
        sys.exit(1)

    ip = device_config.get("ip")
    if not ip:
        console.print("[red]No IP configured. Run 'dyson status' first to discover.[/red]")
        sys.exit(1)

    try:
        from libdyson import get_device as libdyson_get_device
    except ImportError:
        console.print("[red]Error: libdyson not installed.[/red]")
        sys.exit(1)

    dyson_device = libdyson_get_device(
        device_config["serial"],
        device_config["credential"],
        device_config["product_type"],
    )

    try:
        dyson_device.connect(ip)
        time.sleep(1)

        if power_on:
            dyson_device.turn_on()
            console.print(f"[green]✓ {device_config['name']} turned on[/green]")
        else:
            dyson_device.turn_off()
            console.print(f"[green]✓ {device_config['name']} turned off[/green]")

        dyson_device.disconnect()

    except Exception as e:
        console.print(f"[red]Failed: {e}[/red]")
        sys.exit(1)


@cli.group()
def fan():
    """Fan control commands."""
    pass


@fan.command("speed")
@click.argument("speed")
@click.option("--device", "-d", help="Device name or serial")
def fan_speed(speed: str, device: Optional[str]):
    """Set fan speed (1-10 or 'auto')."""
    device_config = get_device(device)
    if not device_config:
        console.print("[red]No device found.[/red]")
        sys.exit(1)

    ip = device_config.get("ip")
    if not ip:
        console.print("[red]No IP configured.[/red]")
        sys.exit(1)

    try:
        from libdyson import get_device as libdyson_get_device
    except ImportError:
        console.print("[red]Error: libdyson not installed.[/red]")
        sys.exit(1)

    dyson_device = libdyson_get_device(
        device_config["serial"],
        device_config["credential"],
        device_config["product_type"],
    )

    try:
        dyson_device.connect(ip)
        time.sleep(1)

        if speed.lower() == "auto":
            dyson_device.enable_auto_mode()
            console.print("[green]✓ Fan set to auto[/green]")
        else:
            speed_int = int(speed)
            if not 1 <= speed_int <= 10:
                console.print("[red]Speed must be 1-10 or 'auto'[/red]")
                sys.exit(1)
            dyson_device.disable_auto_mode()
            dyson_device.set_speed(speed_int)
            console.print(f"[green]✓ Fan speed set to {speed_int}[/green]")

        dyson_device.disconnect()

    except ValueError:
        console.print("[red]Speed must be a number 1-10 or 'auto'[/red]")
        sys.exit(1)
    except Exception as e:
        console.print(f"[red]Failed: {e}[/red]")
        sys.exit(1)


@fan.command("oscillate")
@click.argument("state", type=click.Choice(["on", "off"]))
@click.option("--angle", "-a", type=int, help="Oscillation range in degrees (45, 90, 180, or 350)")
@click.option("--device", "-d", help="Device name or serial")
def fan_oscillate(state: str, angle: Optional[int], device: Optional[str]):
    """Enable or disable oscillation. Use --angle to set range (e.g., 90 for 90 degrees)."""
    device_config = get_device(device)
    if not device_config:
        console.print("[red]No device found.[/red]")
        sys.exit(1)

    ip = device_config.get("ip")
    if not ip:
        console.print("[red]No IP configured.[/red]")
        sys.exit(1)

    try:
        from libdyson import get_device as libdyson_get_device
    except ImportError:
        console.print("[red]Error: libdyson not installed.[/red]")
        sys.exit(1)

    dyson_device = libdyson_get_device(
        device_config["serial"],
        device_config["credential"],
        device_config["product_type"],
    )

    try:
        dyson_device.connect(ip)
        time.sleep(1)

        if state == "on":
            if angle:
                # Center the oscillation around current position (or 180 degrees)
                center = 180
                half = angle // 2
                angle_low = max(5, center - half)
                angle_high = min(355, center + half)
                dyson_device.enable_oscillation(angle_low=angle_low, angle_high=angle_high)
                console.print(f"[green]✓ Oscillation enabled ({angle}° range)[/green]")
            else:
                dyson_device.enable_oscillation()
                console.print("[green]✓ Oscillation enabled[/green]")
        else:
            dyson_device.disable_oscillation()
            console.print("[green]✓ Oscillation disabled[/green]")

        dyson_device.disconnect()

    except Exception as e:
        console.print(f"[red]Failed: {e}[/red]")
        sys.exit(1)


@cli.group()
def heat():
    """Heat control commands (Hot+Cool models only)."""
    pass


@heat.command("on")
@click.option("--device", "-d", help="Device name or serial")
def heat_on(device: Optional[str]):
    """Enable heat mode."""
    _control_heat(device, True)


@heat.command("off")
@click.option("--device", "-d", help="Device name or serial")
def heat_off(device: Optional[str]):
    """Disable heat mode."""
    _control_heat(device, False)


def _control_heat(device_name: Optional[str], enable: bool):
    """Control heat mode."""
    device_config = get_device(device_name)
    if not device_config:
        console.print("[red]No device found.[/red]")
        sys.exit(1)

    ip = device_config.get("ip")
    if not ip:
        console.print("[red]No IP configured.[/red]")
        sys.exit(1)

    try:
        from libdyson import get_device as libdyson_get_device
    except ImportError:
        console.print("[red]Error: libdyson not installed.[/red]")
        sys.exit(1)

    dyson_device = libdyson_get_device(
        device_config["serial"],
        device_config["credential"],
        device_config["product_type"],
    )

    try:
        dyson_device.connect(ip)
        time.sleep(1)

        if not hasattr(dyson_device, "enable_heat_mode"):
            console.print("[red]This device does not support heat mode.[/red]")
            sys.exit(1)

        if enable:
            dyson_device.enable_heat_mode()
            console.print("[green]✓ Heat mode enabled[/green]")
        else:
            dyson_device.disable_heat_mode()
            console.print("[green]✓ Heat mode disabled[/green]")

        dyson_device.disconnect()

    except Exception as e:
        console.print(f"[red]Failed: {e}[/red]")
        sys.exit(1)


@heat.command("target")
@click.argument("temperature", type=int)
@click.option("--device", "-d", help="Device name or serial")
def heat_target(temperature: int, device: Optional[str]):
    """Set target temperature in Celsius (1-37)."""
    device_config = get_device(device)
    if not device_config:
        console.print("[red]No device found.[/red]")
        sys.exit(1)

    ip = device_config.get("ip")
    if not ip:
        console.print("[red]No IP configured.[/red]")
        sys.exit(1)

    if not 1 <= temperature <= 37:
        console.print("[red]Temperature must be between 1 and 37°C[/red]")
        sys.exit(1)

    try:
        from libdyson import get_device as libdyson_get_device
    except ImportError:
        console.print("[red]Error: libdyson not installed.[/red]")
        sys.exit(1)

    dyson_device = libdyson_get_device(
        device_config["serial"],
        device_config["credential"],
        device_config["product_type"],
    )

    try:
        dyson_device.connect(ip)
        time.sleep(1)

        if not hasattr(dyson_device, "set_heat_target"):
            console.print("[red]This device does not support heat target.[/red]")
            sys.exit(1)

        # libdyson uses Kelvin internally
        dyson_device.set_heat_target(temperature + 273)
        console.print(f"[green]✓ Target temperature set to {temperature}°C[/green]")

        dyson_device.disconnect()

    except Exception as e:
        console.print(f"[red]Failed: {e}[/red]")
        sys.exit(1)


@cli.command()
@click.argument("state", type=click.Choice(["on", "off"]))
@click.option("--device", "-d", help="Device name or serial")
def night(state: str, device: Optional[str]):
    """Enable or disable night mode."""
    device_config = get_device(device)
    if not device_config:
        console.print("[red]No device found.[/red]")
        sys.exit(1)

    ip = device_config.get("ip")
    if not ip:
        console.print("[red]No IP configured.[/red]")
        sys.exit(1)

    try:
        from libdyson import get_device as libdyson_get_device
    except ImportError:
        console.print("[red]Error: libdyson not installed.[/red]")
        sys.exit(1)

    dyson_device = libdyson_get_device(
        device_config["serial"],
        device_config["credential"],
        device_config["product_type"],
    )

    try:
        dyson_device.connect(ip)
        time.sleep(1)

        enable = state == "on"
        dyson_device.enable_night_mode() if enable else dyson_device.disable_night_mode()
        console.print(f"[green]✓ Night mode {'enabled' if enable else 'disabled'}[/green]")

        dyson_device.disconnect()

    except Exception as e:
        console.print(f"[red]Failed: {e}[/red]")
        sys.exit(1)


@cli.command("default")
@click.argument("name")
def set_default(name: str):
    """Set the default device."""
    if set_default_device(name):
        console.print(f"[green]✓ Default device set to {name}[/green]")
    else:
        console.print(f"[red]Device '{name}' not found.[/red]")
        sys.exit(1)


@cli.command("remove")
@click.argument("name")
@click.option("--force", "-f", is_flag=True, help="Skip confirmation")
def remove_device(name: str, force: bool):
    """Remove a device from the config."""
    config = load_config()
    devices = config.get("devices", [])
    
    # Find device
    device = None
    for d in devices:
        if d.get("name", "").lower() == name.lower() or d.get("serial", "").lower() == name.lower():
            device = d
            break
    
    if not device:
        console.print(f"[red]Device '{name}' not found.[/red]")
        sys.exit(1)
    
    if not force:
        if not click.confirm(f"Remove {device.get('name')} ({device.get('serial')})?"):
            console.print("Cancelled.")
            return
    
    config["devices"] = [d for d in devices if d.get("serial") != device.get("serial")]
    
    # Update default if needed
    if config.get("default_device") == device.get("name"):
        config["default_device"] = config["devices"][0]["name"] if config["devices"] else None
    
    save_config(config)
    console.print(f"[green]✓ Removed {device.get('name')}[/green]")


if __name__ == "__main__":
    cli()