文件预览

fc.py

查看 Alibabacloud Compute Provision 技能包中的文件内容。

文件内容

scripts/fc.py

#!/usr/bin/env python3
"""Alibaba Cloud Function Compute (FC 3.0) management.

Capabilities:
  - list_regions  (region query)
  - create_function / get_function / list_functions  (function creation)
  - invoke_function  (script execution)
  - delete_function

FC 3.0 uses ROA-style API (version=2023-03-30).
Endpoint pattern: {account_id}.{region}.fc.aliyuncs.com
"""

from __future__ import annotations

import base64
import json
import sys
import os
import time
import zipfile
import tempfile
from typing import Any

sys.path.insert(0, os.path.dirname(__file__))

from common import USER_AGENT, build_client, call_rpc_api, call_roa_api, get_default_region, pp

VERSION = "2023-03-30"

_cached_account_id: str | None = None
_async_task_configured: set[str] = set()


def _get_account_id() -> str:
    """Get Alibaba Cloud account ID.

    Priority: env var > STS GetCallerIdentity (cached after first call).
    """
    global _cached_account_id
    aid = os.environ.get("ALIBABA_CLOUD_ACCOUNT_ID", "")
    if aid:
        return aid
    if _cached_account_id:
        return _cached_account_id

    result = call_rpc_api(
        product="sts",
        version="2015-04-01",
        action="GetCallerIdentity",
        endpoint="sts.aliyuncs.com",
    )
    _cached_account_id = result.get("AccountId", "")
    if not _cached_account_id:
        raise RuntimeError("Failed to get AccountId from STS GetCallerIdentity")
    print(f"Account ID auto-detected: {_cached_account_id}")
    return _cached_account_id


def _get_endpoint(region: str | None = None, account_id: str | None = None) -> str:
    """Build FC endpoint: {account_id}.{region}.fc.aliyuncs.com"""
    r = region or get_default_region()
    aid = account_id or _get_account_id()
    return f"{aid}.{r}.fc.aliyuncs.com"


def _call(
    pathname: str,
    method: str = "GET",
    query: dict | None = None,
    body: dict | None = None,
    region: str | None = None,
    account_id: str | None = None,
    body_type: str = "json",
) -> dict | str:
    ep = _get_endpoint(region, account_id)
    return call_roa_api(
        version=VERSION,
        pathname=pathname,
        method=method,
        query=query,
        body=body,
        region=region,
        endpoint=ep,
        body_type=body_type,
    )


# ---------------------------------------------------------------------------
# Region query
# ---------------------------------------------------------------------------

def list_regions(region: str | None = None, account_id: str | None = None) -> list:
    """List available FC regions.

    Note: FC doesn't have a dedicated ListRegions API at 2023-03-30 level.
    Use ECS DescribeRegions as a proxy, or hardcode common regions.
    """
    common_regions = [
        "cn-hangzhou", "cn-shanghai", "cn-beijing", "cn-shenzhen",
        "cn-chengdu", "cn-hongkong", "ap-southeast-1", "us-west-1",
        "eu-central-1", "ap-northeast-1",
    ]
    print(f"FC commonly available regions: {common_regions}")
    return common_regions


# ---------------------------------------------------------------------------
# Function management
# ---------------------------------------------------------------------------

def create_function(
    function_name: str,
    runtime: str = "python3.10",
    handler: str = "index.handler",
    code_dir: str | None = None,
    code_zip_path: str | None = None,
    code_inline: str | None = None,
    memory_size: int = 512,
    cpu: float = 0.35,
    timeout: int = 300,
    disk_size: int = 512,
    gpu_memory_size: int | None = None,
    environment_variables: dict | None = None,
    description: str = "Created by Alibaba Cloud Compute Provision skill",
    region: str | None = None,
    account_id: str | None = None,
) -> dict:
    """Create a function in FC 3.0.

    Provide code via one of:
      - code_dir: local directory to zip and upload
      - code_zip_path: pre-built zip file path
      - code_inline: inline code string (will create a zip with index.py)

    For custom-runtime, set runtime='custom' and handler to your entrypoint.
    """
    body: dict[str, Any] = {
        "functionName": function_name,
        "runtime": runtime,
        "handler": handler,
        "memorySize": memory_size,
        "cpu": cpu,
        "timeout": timeout,
        "diskSize": disk_size,
        "description": description,
    }

    if gpu_memory_size:
        body["gpuConfig"] = {"gpuMemorySize": gpu_memory_size}

    if environment_variables:
        body["environmentVariables"] = environment_variables

    zip_bytes = None
    if code_inline:
        zip_bytes = _create_inline_zip(code_inline, handler)
    elif code_dir:
        zip_bytes = _zip_directory(code_dir)
    elif code_zip_path:
        with open(code_zip_path, "rb") as f:
            zip_bytes = f.read()

    if zip_bytes:
        encoded = base64.b64encode(zip_bytes).decode("utf-8")
        body["code"] = {"zipFile": encoded}

    result = _call(
        f"/2023-03-30/functions",
        method="POST",
        body=body,
        region=region,
        account_id=account_id,
    )
    print(f"Function created: {function_name}")
    return result


def get_function(
    function_name: str,
    region: str | None = None,
    account_id: str | None = None,
) -> dict:
    """Get function details."""
    return _call(
        f"/2023-03-30/functions/{function_name}",
        region=region,
        account_id=account_id,
    )


def list_functions(
    prefix: str | None = None,
    limit: int = 50,
    region: str | None = None,
    account_id: str | None = None,
) -> dict:
    """List functions."""
    query: dict[str, Any] = {"limit": limit}
    if prefix:
        query["prefix"] = prefix
    return _call(
        "/2023-03-30/functions",
        query=query,
        region=region,
        account_id=account_id,
    )


def delete_function(
    function_name: str,
    region: str | None = None,
    account_id: str | None = None,
) -> dict:
    """Delete a function."""
    return _call(
        f"/2023-03-30/functions/{function_name}",
        method="DELETE",
        region=region,
        account_id=account_id,
        body_type="none",
    )


def put_async_invoke_config(
    function_name: str,
    async_task: bool = True,
    max_async_retry_attempts: int = 0,
    region: str | None = None,
    account_id: str | None = None,
) -> dict:
    """Configure async invocation for a function.

    Required for GetAsyncTask to work — must enable asyncTask before async invoke.
    """
    body = {
        "asyncTask": async_task,
        "maxAsyncRetryAttempts": max_async_retry_attempts,
    }
    result = _call(
        f"/2023-03-30/functions/{function_name}/async-invoke-config",
        method="PUT",
        body=body,
        region=region,
        account_id=account_id,
    )
    if async_task:
        _async_task_configured.add(function_name)
    print(f"Async invoke config set for {function_name}: asyncTask={async_task}")
    return result


def delete_async_invoke_config(
    function_name: str,
    region: str | None = None,
    account_id: str | None = None,
) -> dict:
    """Delete async invocation config for a function."""
    return _call(
        f"/2023-03-30/functions/{function_name}/async-invoke-config",
        method="DELETE",
        region=region,
        account_id=account_id,
        body_type="none",
    )


# ---------------------------------------------------------------------------
# Function invocation (script execution)
# ---------------------------------------------------------------------------

def invoke_function(
    function_name: str,
    payload: str | dict | None = None,
    invocation_type: str | None = None,
    log_type: str = "Tail",
    qualifier: str | None = None,
    async_task_id: str | None = None,
    timeout: int | None = None,
    region: str | None = None,
    account_id: str | None = None,
) -> dict:
    """Invoke a function.

    Args:
        function_name: Function to invoke.
        payload: Event payload (string or dict).
        invocation_type: Sync | Async. If None, auto-selects based on timeout:
            timeout > 60s → Async, otherwise Sync.
        log_type: None | Tail (return execution logs, Sync only).
        qualifier: Version or alias.
        async_task_id: Custom task ID for async invocations (for tracking).
            Auto-generated if not provided for async calls.
        timeout: Function timeout in seconds. Used for auto Sync/Async decision.
            If not set, reads from function config or defaults to Sync.

    Returns:
        For Sync: response body.
        For Async: dict with taskId for polling via get_async_task.
    """
    if invocation_type is None:
        invocation_type = "Async" if (timeout and timeout > 60) else "Sync"

    if invocation_type == "Async" and not async_task_id:
        import uuid
        async_task_id = f"acf-{uuid.uuid4().hex[:16]}"

    ep = _get_endpoint(region, account_id)
    r = region or get_default_region()

    from alibabacloud_tea_openapi import models as open_api_models
    from darabonba.runtime import RuntimeOptions

    client = build_client(ep, r)

    query_params: dict[str, str] = {}
    if qualifier:
        query_params["qualifier"] = qualifier

    headers: dict[str, str] = {
        "x-fc-invocation-type": invocation_type,
    }
    if invocation_type == "Sync":
        headers["x-fc-log-type"] = log_type
    if async_task_id:
        headers["x-fc-async-task-id"] = async_task_id

    body_str = ""
    if payload:
        body_str = payload if isinstance(payload, str) else json.dumps(payload, ensure_ascii=False)

    body_type = "none" if invocation_type == "Async" else "json"

    api_params = open_api_models.Params(
        action="InvokeFunction",
        version=VERSION,
        protocol="HTTPS",
        pathname=f"/2023-03-30/functions/{function_name}/invocations",
        method="POST",
        auth_type="AK",
        style="ROA",
        body_type=body_type,
        req_body_type="json",
    )
    request = open_api_models.OpenApiRequest(
        query=query_params if query_params else None,
        headers=headers,
        body=body_str,
    )
    result = client.call_api(api_params, request, RuntimeOptions())
    body = result.get("body", result)

    if invocation_type == "Async":
        print(f"Function {function_name} invoked asynchronously, taskId: {async_task_id}")
        return {"taskId": async_task_id, "invocationType": "Async"}
    else:
        print(f"Function {function_name} invoked synchronously")
        return body


def get_async_task(
    function_name: str,
    task_id: str,
    qualifier: str | None = None,
    region: str | None = None,
    account_id: str | None = None,
) -> dict:
    """Get async task status and result.

    Returns dict with: taskId, status, startedTime, endTime, durationMs,
    taskErrorMessage, taskPayload, returnPayload, etc.

    Status values: Running, Succeeded, Failed, Stopped, Expired, Retrying.
    """
    query: dict[str, str] = {}
    if qualifier:
        query["qualifier"] = qualifier
    return _call(
        f"/2023-03-30/functions/{function_name}/async-tasks/{task_id}",
        query=query or None,
        region=region,
        account_id=account_id,
    )


def invoke_function_and_wait(
    function_name: str,
    payload: str | dict | None = None,
    timeout: int = 300,
    qualifier: str | None = None,
    region: str | None = None,
    account_id: str | None = None,
    poll_interval: int = 5,
) -> dict:
    """Invoke a function and wait for result.

    Auto-selects Sync or Async based on timeout (>60s → Async).
    For async calls, enables asyncTask config then polls GetAsyncTask until completion.
    """
    invocation_type = "Sync" if timeout <= 60 else "Async"

    if invocation_type == "Async" and function_name not in _async_task_configured:
        put_async_invoke_config(function_name, async_task=True, region=region, account_id=account_id)

    result = invoke_function(
        function_name=function_name,
        payload=payload,
        timeout=timeout,
        qualifier=qualifier,
        region=region,
        account_id=account_id,
    )

    if not isinstance(result, dict) or result.get("invocationType") != "Async":
        return result

    task_id = result["taskId"]
    print(f"Waiting for async task {task_id}...")
    time.sleep(3)
    deadline = time.time() + timeout + 60
    terminal_statuses = {"Succeeded", "Failed", "Stopped", "Expired"}

    while time.time() < deadline:
        try:
            task = get_async_task(function_name, task_id, qualifier, region, account_id)
        except Exception as e:
            if "AsyncTaskNotFound" in str(e):
                print("  Async task not yet registered, retrying...")
                time.sleep(poll_interval)
                continue
            raise
        status = task.get("status", "")
        print(f"  Async task status: {status}")
        if status in terminal_statuses:
            if status != "Succeeded":
                err = task.get("taskErrorMessage", "unknown error")
                raise RuntimeError(f"Async task {task_id} {status}: {err}")
            return task
        time.sleep(poll_interval)

    raise TimeoutError(f"Async task {task_id} timed out after {timeout + 60}s")


def _generate_function_name(prefix: str = "acf-task") -> str:
    """Generate a unique function name with timestamp suffix."""
    ts = time.strftime("%Y%m%d-%H%M%S")
    import uuid
    short_id = uuid.uuid4().hex[:6]
    return f"{prefix}-{ts}-{short_id}"


def create_and_invoke(
    function_name: str | None = None,
    script_content: str | None = None,
    script_path: str | None = None,
    script_type: str = "python",
    runtime: str = "python3.10",
    handler: str = "index.handler",
    memory_size: int = 512,
    cpu: float = 0.35,
    timeout: int = 300,
    payload: str | dict | None = None,
    auto_cleanup: bool = True,
    region: str | None = None,
    account_id: str | None = None,
) -> dict:
    """Create a function from code, invoke it, wait for result, optionally cleanup.

    Args:
        function_name: Function name. Auto-generated with timestamp if None.
        script_content: Handler code string. Mutually exclusive with script_path.
        script_path: Path to a local script file. Auto-wrapped via
            wrap_shell_script (for .sh) or wrap_python_script (for .py).
            script_type is inferred from file extension when script_path is used.
        script_type: "shell" | "python". Used when script_content is raw
            user code (not yet wrapped). Ignored if script_content is already
            a handler function or if script_path determines the type.
        auto_cleanup: If True (default), delete_function after execution.
        Other args: passed to create_function / invoke_function_and_wait.

    Returns:
        Execution result dict.
    """
    if function_name is None:
        function_name = _generate_function_name()

    if script_path:
        with open(script_path, "r", encoding="utf-8") as f:
            raw_script = f.read()
        if script_path.endswith(".sh"):
            script_type = "shell"
        elif script_path.endswith(".py"):
            script_type = "python"
        script_content = _auto_wrap(raw_script, script_type, timeout)
    elif script_content and "def handler(" not in script_content:
        script_content = _auto_wrap(script_content, script_type, timeout)

    if not script_content:
        raise ValueError("Either script_content or script_path must be provided")

    mem_gb = memory_size / 1024
    ratio = mem_gb / cpu if cpu > 0 else 0
    if ratio < 1 or ratio > 4:
        raise ValueError(
            f"FC requires Memory(GB)/CPU ratio between 1 and 4, "
            f"got {mem_gb:.2f}GB/{cpu}core = {ratio:.2f}. "
            f"Tip: use the defaults (memory_size=512, cpu=0.35) or "
            f"adjust to satisfy the constraint, e.g. memory_size=512, cpu=0.35."
        )

    create_function(
        function_name=function_name,
        runtime=runtime,
        handler=handler,
        code_inline=script_content,
        memory_size=memory_size,
        cpu=cpu,
        timeout=timeout,
        region=region,
        account_id=account_id,
    )
    time.sleep(2)
    try:
        result = invoke_function_and_wait(
            function_name=function_name,
            payload=payload,
            timeout=timeout,
            region=region,
            account_id=account_id,
        )
        return result
    finally:
        if auto_cleanup:
            try:
                if function_name in _async_task_configured:
                    delete_async_invoke_config(function_name, region=region, account_id=account_id)
                    _async_task_configured.discard(function_name)
                delete_function(function_name, region=region, account_id=account_id)
                print(f"Function {function_name} auto-cleaned up")
            except Exception as e:
                print(f"Warning: auto-cleanup of {function_name} failed: {e}")


def create_script_executor(
    function_name: str | None = None,
    runtime: str = "python3.10",
    memory_size: int = 512,
    cpu: float = 0.35,
    timeout: int = 600,
    region: str | None = None,
    account_id: str | None = None,
) -> str:
    """Create a reusable script executor function.

    The executor accepts scripts via payload, so one function can run
    many different scripts without recreation.

    Payload format:
        {"script": "echo hello", "type": "shell"}
        {"script": "print(1+1)", "type": "python"}

    Returns:
        The function_name (for use with invoke_function_and_wait).
    """
    if function_name is None:
        function_name = _generate_function_name("acf-executor")

    executor_code = '''
import subprocess, json, base64, time, sys, os

def handler(event, context):
    start = time.time()
    try:
        evt = json.loads(event) if isinstance(event, (str, bytes)) else event
    except Exception:
        evt = {}

    script = evt.get("script", "")
    script_b64 = evt.get("script_b64", "")
    script_type = evt.get("type", "shell")
    fc_timeout = int(evt.get("timeout", context.credentials.timeout if hasattr(context, "credentials") else 300))
    sub_timeout = max(fc_timeout - 20, 10)

    if script_b64:
        script = base64.b64decode(script_b64).decode("utf-8")

    if not script:
        return json.dumps({"error": "No script provided in payload"})

    env_info = {"python": sys.version, "cwd": os.getcwd(), "platform": sys.platform}

    if script_type == "shell":
        r = subprocess.run(
            ["bash", "-c", script],
            capture_output=True, text=True, timeout=sub_timeout,
        )
        elapsed = round(time.time() - start, 3)
        return json.dumps({
            "stdout": r.stdout, "stderr": r.stderr,
            "returncode": r.returncode, "elapsed_s": elapsed,
            "env": env_info,
        })
    else:
        old_stdout, old_stderr = sys.stdout, sys.stderr
        from io import StringIO
        sys.stdout, sys.stderr = StringIO(), StringIO()
        exec_globals = {"__builtins__": __builtins__}
        try:
            exec(script, exec_globals)
            elapsed = round(time.time() - start, 3)
            return json.dumps({
                "stdout": sys.stdout.getvalue(), "stderr": sys.stderr.getvalue(),
                "returncode": 0, "elapsed_s": elapsed,
                "env": env_info,
            })
        except Exception as e:
            elapsed = round(time.time() - start, 3)
            return json.dumps({
                "stdout": sys.stdout.getvalue(), "stderr": sys.stderr.getvalue() + str(e),
                "returncode": 1, "error": str(e), "elapsed_s": elapsed,
                "env": env_info,
            })
        finally:
            sys.stdout, sys.stderr = old_stdout, old_stderr
'''
    create_function(
        function_name=function_name,
        runtime=runtime,
        handler="index.handler",
        code_inline=executor_code,
        memory_size=memory_size,
        cpu=cpu,
        timeout=timeout,
        region=region,
        account_id=account_id,
    )
    print(f"Script executor created: {function_name}")
    return function_name


# ---------------------------------------------------------------------------
# Script wrapping helpers
# ---------------------------------------------------------------------------

def wrap_shell_script(script: str, timeout: int = 300) -> str:
    """Wrap a shell script into an FC handler using base64 encoding.

    Uses base64 to safely embed any shell content (avoids triple-quote,
    backtick, and dollar-sign escaping issues). subprocess timeout is
    derived from FC timeout with a 20s safety margin.
    Includes execution timing, environment info, and stderr capture.
    """
    encoded = base64.b64encode(script.encode("utf-8")).decode("ascii")
    sub_timeout = max(timeout - 20, 10)
    return f'''import subprocess, json, base64, time, sys, os

def handler(event, context):
    start = time.time()
    script = base64.b64decode("{encoded}").decode("utf-8")
    r = subprocess.run(
        ["bash", "-c", script],
        capture_output=True, text=True, timeout={sub_timeout},
    )
    elapsed = round(time.time() - start, 3)
    return json.dumps({{
        "stdout": r.stdout, "stderr": r.stderr,
        "returncode": r.returncode, "elapsed_s": elapsed,
        "env": {{"python": sys.version, "cwd": os.getcwd()}},
    }})
'''


def wrap_python_script(script: str, timeout: int = 300) -> str:
    """Wrap a Python script into an FC handler using base64 encoding.

    Captures stdout/stderr via StringIO redirection. Includes execution
    timing and environment info for debugging.
    """
    encoded = base64.b64encode(script.encode("utf-8")).decode("ascii")
    return f'''import json, base64, time, sys, os
from io import StringIO

def handler(event, context):
    start = time.time()
    script = base64.b64decode("{encoded}").decode("utf-8")
    old_stdout, old_stderr = sys.stdout, sys.stderr
    sys.stdout, sys.stderr = StringIO(), StringIO()
    try:
        exec(script, {{"__builtins__": __builtins__}})
        elapsed = round(time.time() - start, 3)
        return json.dumps({{
            "stdout": sys.stdout.getvalue(), "stderr": sys.stderr.getvalue(),
            "returncode": 0, "elapsed_s": elapsed,
            "env": {{"python": sys.version, "cwd": os.getcwd()}},
        }})
    except Exception as e:
        elapsed = round(time.time() - start, 3)
        return json.dumps({{
            "stdout": sys.stdout.getvalue(),
            "stderr": sys.stderr.getvalue() + "\\n" + str(e),
            "returncode": 1, "error": str(e), "elapsed_s": elapsed,
            "env": {{"python": sys.version, "cwd": os.getcwd()}},
        }})
    finally:
        sys.stdout, sys.stderr = old_stdout, old_stderr
'''


def _auto_wrap(script: str, script_type: str, timeout: int) -> str:
    """Auto-wrap a raw script into FC handler format."""
    if script_type == "shell":
        return wrap_shell_script(script, timeout)
    else:
        return wrap_python_script(script, timeout)


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def _create_inline_zip(code: str, handler: str) -> bytes:
    """Create a zip file from inline code string.

    For handler='index.handler', creates index.py with the code.
    """
    module_name = handler.split(".")[0] if "." in handler else "index"
    filename = f"{module_name}.py"

    tmp = tempfile.NamedTemporaryFile(suffix=".zip", delete=False)
    tmp.close()
    try:
        with zipfile.ZipFile(tmp.name, "w", zipfile.ZIP_DEFLATED) as zf:
            zf.writestr(filename, code)
        with open(tmp.name, "rb") as f:
            return f.read()
    finally:
        os.unlink(tmp.name)


def _zip_directory(dir_path: str) -> bytes:
    """Zip a directory into bytes."""
    tmp = tempfile.NamedTemporaryFile(suffix=".zip", delete=False)
    tmp.close()
    try:
        with zipfile.ZipFile(tmp.name, "w", zipfile.ZIP_DEFLATED) as zf:
            base = os.path.abspath(dir_path)
            for root, dirs, files in os.walk(base):
                for fn in files:
                    full = os.path.join(root, fn)
                    arcname = os.path.relpath(full, base)
                    zf.write(full, arcname)
        with open(tmp.name, "rb") as f:
            return f.read()
    finally:
        os.unlink(tmp.name)