文件预览

encrypted_store.py

查看 1688 Shop Operate 技能包中的文件内容。

文件内容

scripts/encrypted_store.py

"""
文件存储后端

当 OS Keychain 不可用时(如沙箱环境),将 Token 和元数据
存储在本地 JSON 文件中。

文件权限设置为 0o600(仅文件所有者可读写)。
"""
from __future__ import annotations

import json
import logging
import os
import sys
import tempfile
from pathlib import Path

from _const import DATA_DIR

logger = logging.getLogger(__name__)

ENCRYPTED_TOKEN_FILE = DATA_DIR / ".token_store.json"


def _set_file_permissions(path: str) -> None:
    """设置文件权限为仅所有者可读写(跨平台)。"""
    if sys.platform == "win32":
        try:
            import subprocess
            username = os.environ.get("USERNAME", "")
            if username:
                subprocess.run(
                    ["icacls", path, "/inheritance:r", "/grant:r", f"{username}:F"],
                    check=True,
                    capture_output=True,
                )
        except Exception as e:
            logger.debug("Windows 文件权限设置失败(非致命): %s", e)
    else:
        os.chmod(path, 0o600)


def _load_store() -> dict[str, str]:
    if not ENCRYPTED_TOKEN_FILE.exists():
        return {}
    try:
        raw = ENCRYPTED_TOKEN_FILE.read_text(encoding="utf-8")
        return json.loads(raw)
    except (json.JSONDecodeError, Exception) as e:
        logger.warning("存储文件读取失败: %s,将重建", e)
        return {}


def _save_store(data: dict[str, str]) -> None:
    json_bytes = json.dumps(data, ensure_ascii=False, indent=2).encode("utf-8")
    ENCRYPTED_TOKEN_FILE.parent.mkdir(parents=True, exist_ok=True)
    fd, tmp_path = tempfile.mkstemp(
        dir=str(ENCRYPTED_TOKEN_FILE.parent),
        prefix=".token_",
        suffix=".tmp",
    )
    try:
        os.write(fd, json_bytes)
        os.close(fd)
        _set_file_permissions(tmp_path)
        try:
            os.replace(tmp_path, str(ENCRYPTED_TOKEN_FILE))
        except PermissionError:
            if sys.platform == "win32":
                # Windows 下目标文件可能被其他进程持有,先删除再替换
                try:
                    os.unlink(str(ENCRYPTED_TOKEN_FILE))
                except OSError:
                    pass
                os.replace(tmp_path, str(ENCRYPTED_TOKEN_FILE))
            else:
                raise
    except Exception:
        try:
            os.close(fd)
        except OSError:
            pass
        if os.path.exists(tmp_path):
            os.unlink(tmp_path)
        raise


def enc_store_token(key: str, value: str) -> None:
    store = _load_store()
    store[key] = value
    _save_store(store)
    logger.debug("Token 已写入文件存储: key=%s", key)


def enc_load_token(key: str) -> str | None:
    store = _load_store()
    return store.get(key) or None


def enc_delete_token(key: str) -> None:
    store = _load_store()
    if key in store:
        del store[key]
        _save_store(store)
        logger.debug("Token 已从文件存储删除: key=%s", key)
    else:
        logger.debug("文件存储中无此 Token,跳过删除: key=%s", key)