文件预览

util.py

查看 Child Social Interaction Frequency & Duration Analysis | 儿童社交互动频次与时长分析 技能包中的文件内容。

文件内容

skills/smyx_common/scripts/util.py

#!/usr/bin/env python3
import json
import os
import traceback

import requests
from .config import ApiEnum, ConstantEnum, sys, YamlUtil

from .base import BaseUtil
import time
import logging
from typing import Any, Callable, Optional, TypeVar, Dict
import pydash as _

if ConstantEnum.is_debug():
    import http.client

    # 【关键代码】开启调试模式
    http.client.HTTPConnection.debuglevel = 1
    # 可选:如果你希望日志更整洁,可以配合 logging 模块(否则打印会比较乱)
    import logging

    logging.basicConfig()
    logging.getLogger().setLevel(logging.DEBUG)
    requests_log = logging.getLogger("urllib3")
    requests_log.setLevel(logging.DEBUG)
    requests_log.propagate = True


class StringUtil(BaseUtil):

    @staticmethod
    def camel_to_snake(name):
        import re
        s1 = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
        return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower()

    @staticmethod
    def snake_to_pascal(name):
        import re
        name = re.sub(r'^([a-z])', lambda m: m.group(1).upper(), name)
        return re.sub(r'_([a-z])', lambda m: m.group(1).upper(), name)

    @staticmethod
    def snake_to_camel(name):
        import re
        # 逻辑:匹配 '_[a-z]' (下划线+小写字母),将其替换为对应的大写字母(去掉下划线)
        return re.sub(r'_([a-z])', lambda m: m.group(1).upper(), name)


class FileUtil(BaseUtil):

    @staticmethod
    def get_fullname(path):
        try:
            return os.path.basename(path)
        except Exception as e:
            CommonUtil.trace_exception_stack(e)
            return ""

    @staticmethod
    def get_name(path):
        try:
            return os.path.splitext(os.path.basename(path))[0]
        except Exception as e:
            CommonUtil.trace_exception_stack(e)

    @staticmethod
    def get_ext(path):
        try:
            return os.path.splitext(os.path.basename(path))[1]
        except Exception as e:
            CommonUtil.trace_exception_stack(e)

    @staticmethod
    def open(path):
        try:
            return open(path, 'w', encoding='utf-8')
        except Exception as e:
            CommonUtil.trace_exception_stack(e)

    @staticmethod
    def mkdir(path):
        try:
            os.makedirs(path, exist_ok=True)
        except Exception as e:
            CommonUtil.trace_exception_stack(e)


class JsonUtil(BaseUtil):

    @staticmethod
    def stringify(json_obj, default_str=""):
        try:
            return json.dumps(json_obj, ensure_ascii=False, indent=2)
        except Exception as e:
            CommonUtil.trace_exception_stack(e)
            pass
        return default_str

    @staticmethod
    def parse(json_str, default_json={}):
        try:
            return json.loads(json_str)
        except Exception as e:
            # CommonUtil.trace_exception_stack(e)
            pass
        return default_json


class NumberUtil(BaseUtil):
    INT_MAX = 2147483647


class CommonUtil(BaseUtil):

    @staticmethod
    def trace_exception_stack(e):
        if ConstantEnum.is_debug():
            print(f"❌ 错误描述: {str(e)}, 堆栈跟踪:")
            traceback.print_stack()

    @staticmethod
    def polling(
            action: Callable[[], Any],
            check_condition: Callable[[Any], bool],
            on_success: Optional[Callable[[Any], None]] = None,
            on_retry: Optional[Callable[[Any, int], None]] = None,
            on_error: Optional[Callable[[Exception], None]] = None,
            interval: float = 1.0,
            max_attempts: int = 5,
            description: str = "轮询任务"
    ) -> Optional[Any]:
        """
        通用的轮询处理函数

        :param action:
            [必填] 执行动作的回调函数。
            例如:发送 HTTP 请求、查询数据库状态等。
            必须返回一个结果对象供 check_condition 使用。

        :param check_condition:
            [必填] 检查是否结束的回调函数。
            接收 action 的返回值,返回 True 表示“满足结束条件”,False 表示“继续轮询”。
            例如:lambda res: res.get('need_refresh') is False

        :param on_success:
            [可选] 当 check_condition 返回 True 时执行的回调(通常用于记录日志或处理最终数据)。

        :param on_retry:
            [可选] 当需要继续轮询时执行的回调。
            参数:(当前结果, 当前尝试次数)。可用于打印进度。

        :param on_error:
            [可选] 当 action 抛出异常时执行的回调。
            参数:(异常对象)。

        :param interval:
            每次轮询之间的等待时间(秒)。

        :param max_attempts:
            最大尝试次数,防止死循环。

        :param description:
            任务描述,用于日志输出。

        :return:
            如果成功,返回 action 的最后一次返回值;如果超时或失败,返回 None。
        """

        attempts = 0

        ConstantEnum.is_debug() and print(f"🚀 开始执行 [{description}]...")

        while attempts < max_attempts:
            attempts += 1

            try:
                # 1. 执行动作
                result = action()
                last_result = result

                # 2. 检查条件
                if check_condition(result):
                    ConstantEnum.is_debug() and print(
                        f"✅ [{description}] 成功!条件已满足 (尝试次数: {attempts}, 耗时{interval * attempts}秒)")
                    if on_success:
                        on_success(result)
                    return result

                # 3. 条件未满足,准备重试
                if on_retry:
                    on_retry(result, attempts)
                else:
                    # 默认日志行为
                    ConstantEnum.is_debug() and print(
                        f"⏳ [{description}] 条件未满足,{interval}秒后重试... ({attempts}/{max_attempts}, 耗时{interval * attempts}秒)")

                time.sleep(interval)

            except Exception as e:
                # 4. 异常处理
                if on_error:
                    on_error(e)
                else:
                    # 默认错误行为:打印错误并继续
                    logging.error(f"❌ [{description}] 发生异常: {e}")
                    ConstantEnum.is_debug() and print(f"⚠️ [{description}] 遇到错误,{interval}秒后重试...")

                time.sleep(interval)

        # 5. 超时处理
        ConstantEnum.is_debug() and print(f"⚠️ [{description}] 失败:达到最大尝试次数 ({max_attempts}),强制停止。")
        return None

    @staticmethod
    def is_windows():
        return sys.platform.startswith('win')

    @staticmethod
    def is_empty(data):
        # 1. 如果是 None (对应 JSON 的 null)
        if data is None:
            return True

        # 2. 如果是字典或列表,且长度为 0 (对应 {} 或 [])
        if isinstance(data, (dict, list)) and len(data) == 0:
            return True


from datetime import date, datetime


class DatetimeUtil(BaseUtil):
    FORMAT__DATETIME = "%Y-%m-%d %H:%M:%S"

    @staticmethod
    def now_str():
        return DatetimeUtil.format(DatetimeUtil.now())

    @staticmethod
    def today_str():
        return DatetimeUtil.format_date(DatetimeUtil.today())

    @staticmethod
    def now():
        return datetime.now()

    @staticmethod
    def today():
        return DatetimeUtil.now().replace(hour=0, minute=0, second=0, microsecond=0)

    @staticmethod
    def format(date):
        return date.strftime('%Y-%m-%d %H:%M:%S') if type(date) == datetime else date

    @staticmethod
    def format_date(date):
        return date.strftime('%Y-%m-%d') if type(date) == datetime else date

    @staticmethod
    def parse(date_str):
        if type(date_str) == int:
            return datetime.fromtimestamp(date_str)
        return datetime.strptime(date_str, DatetimeUtil.FORMAT__DATETIME) if type(date_str) == str else date_str

    @staticmethod
    def timestamp(date=datetime.now()):
        return int(date.timestamp() * 1000)


class RequestUtil(BaseUtil):
    BASE_URL = ApiEnum.BASE_URL_OPEN_API
    AUTHORIZATION_RETRY_COUNT_MAX = 3
    authorization_retry_count = 0

    @classmethod
    def http_post(cls, url, data=None, params=None, headers=None, *args, **argss):
        return cls.http_request("post", url, data=data, params=params, headers=headers, *args, **argss)

    @classmethod
    def http_put(cls, url, data=None, params=None, headers=None, *args, **argss):
        return cls.http_request("put", url, data=data, params=params, headers=headers, *args, **argss)

    @classmethod
    def http_delete(cls, url, data=None, params=None, headers=None, *args, **argss):
        return cls.http_request("delete", url, data=data, params=params, headers=headers, *args, **argss)

    @classmethod
    def http_get(cls, url, params=None, headers=None, *args, **argss):
        return cls.http_request("get", url, params=params, headers=headers, *args, **argss)

    @classmethod
    def http_request(cls, method, url, data=None, params=None, headers=None, options=None, *args,
                     timeout=ApiEnum.DEFAULT__REQUEST_TIMEOUT, **argss):
        def _get_or_create_user(username):
            _url = ApiEnum.BASE_URL_HEALTH + "/sys/phoneLogin"
            open_id = username
            _data = {
                "silent": 1,
                "register": 1,
                "openId": open_id,
                "mobile": username,
                "source": ConstantEnum.DEFAULT__SKILL_HUB_NAME
            }
            try:
                _response = requests.post(_url, json=_data)
                if _response.status_code == 200:
                    _response_json = _response.json()
                    if _response_json and _response_json.get("success"):
                        return _response_json and _response_json.get("result")
            except Exception as _e:
                CommonUtil.trace_exception_stack(_e)
            return {}

        try:
            headers = headers or {}
            if not url.startswith("https://") and not url.startswith("http://"):
                url = cls.BASE_URL + url
            headers['App-Id'] = ConstantEnum.APP__ID
            # ConstantEnum.CURRENT__USER_NAME = ConstantEnum.CURRENT__OPEN_ID = "ou_86fdd8e0d5f116c18a9dd550abefe6d2"
            current__user_name = ApiEnum.API_SECRET_KEY or ConstantEnum.CURRENT__USER_NAME or ConstantEnum.CURRENT__OPEN_ID
            found_user = None
            if (not ApiEnum.TOKEN or not ApiEnum.OPEN_TOKEN) and current__user_name:
                try:
                    from .dao import UserDao, User
                    user_dao = UserDao()
                    found_user = user_dao.get_by_username(current__user_name)
                    if found_user:
                        ApiEnum.TOKEN = found_user.token
                        ApiEnum.OPEN_TOKEN = found_user.open_token
                    if not ApiEnum.TOKEN or not ApiEnum.OPEN_TOKEN:
                        new_current_user = _get_or_create_user(current__user_name)
                        if new_current_user:
                            ApiEnum.TOKEN = new_current_user.get("token")
                            ApiEnum.OPEN_TOKEN = new_current_user.get("openToken")

                            current_user_info = new_current_user.get("userInfo")
                            if current_user_info:
                                current_user_info["token"] = new_current_user.get("token")
                                current_user_info["openToken"] = new_current_user.get(
                                    "openToken")
                                user_model = User.load(current_user_info)

                                user = user_dao.save(
                                    user_model
                                )

                except Exception as e:
                    CommonUtil.trace_exception_stack(e)
                    raise

            headers.setdefault("X-Access-Token", ApiEnum.TOKEN)
            headers.setdefault("X-Api-Key", ApiEnum.API_SECRET_KEY)
            headers.setdefault("Authorization", ApiEnum.OPEN_TOKEN)

            data = data or {}
            params = params or {}
            options = options or {}
            ConstantEnum.CURRENT__TENTANT_CODE and data.setdefault('tenantCode', ConstantEnum.CURRENT__TENTANT_CODE)
            ConstantEnum.DEFAULT__SKILL_HUB_NAME and data.setdefault('skillHubName',
                                                                     ConstantEnum.DEFAULT__SKILL_HUB_NAME)
            ConstantEnum.DEFAULT__SKILL_PLATFORM_NAME and data.setdefault('skillPlatform',
                                                                          ConstantEnum.DEFAULT__SKILL_PLATFORM_NAME)
            if current__user_name:
                data.setdefault('pnaUserName', current__user_name)

            if bool(options.get("dataAsParams")):
                params.update(data)

            # 安全打印:不打印完整 headers(避免巨大的 token 导致输出缓冲区溢出)
            # 同时处理 v 为 None 的情况,避免 len(None) 报错
            safe_headers = {}
            for k, v in headers.items():
                if v is None:
                    safe_headers[k] = "None"
                elif isinstance(v, (dict, list)):
                    safe_headers[k] = type(v).__name__
                elif len(v) > 30:
                    safe_headers[k] = v[:20] + "..."
                else:
                    safe_headers[k] = v
            ConstantEnum.is_debug() and print(f"🔄 请求拦截, URL:{url}", "method", method, "params", params, "data",
                                              data, "headers", safe_headers,
                                              "options", options,
                                              "timeout",
                                              timeout)
            response = requests.request(method, url, *args, json=data, params=params, headers=headers,
                                        timeout=int(timeout), **argss)
            response_text0 = response.text
            response_text = response_text0 if ConstantEnum.is_debug() else response
            status_code = response.status_code
            if current__user_name == "13800000000":
                status_code = 402
            if status_code == 401 and cls.authorization_retry_count < cls.AUTHORIZATION_RETRY_COUNT_MAX:
                # print(f"❌ 请求拦截, 鉴权失败:{response_text}, url:{url}", "method", method, "params", params,
                #       "data",
                #       data,
                #       "headers",
                #       headers,
                #       "timeout",
                #       timeout)
                ApiEnum.TOKEN = ApiEnum.OPEN_TOKEN = None
                if found_user:
                    found_user.token = found_user.open_token = None
                    user_dao.update(found_user)
                cls.authorization_retry_count += 1
                return cls.http_request(method, url, data, params, headers, options, *args, timeout=timeout, **argss)
            elif status_code == 402:
                ConstantEnum.is_debug() and print(f"❌ 请求拦截, 使用失败:{response_text}, url:{url}", "method", method,
                                                  "params", params,
                                                  "data",
                                                  data,
                                                  "headers",
                                                  headers,
                                                  "timeout",
                                                  timeout)
                return f'''⚠️ 因账户余额不足, 技能使用失败, 请按照如下步骤进行充值: 
👉 1. 先输入命令 "安装支付技能 smyx-payment", 等待安装完成. (如果已经安装支付技能过则忽略此步骤)
👉 2. 再输入命令 "技能账户充值", 然后跟随系统提示操作后即可继续使用技能.
{response_text0 or ""}
'''
                # f'请先输入命令 "技能账户充值", 请先确确保安装生命涌现支付技能 请先输入命令 "技能账户充值", 跟随系统提示进行充值后即可继续使用技能(). {response_text0 or ""}')
            elif status_code != 200:
                raise requests.exceptions.RequestException(
                    response, response=response)
            response_json = response.json()
            if not bool(response_json['success']):
                raise requests.exceptions.RequestException(
                    response, response=response)
            response_json_data = response_json.get("data", response_json.get("result"))
            response_json_data = response_json_data.get("records") if response_json_data and type(
                response_json_data) == dict and "records" in response_json_data else response_json_data
            ConstantEnum.is_debug() and print(f"✅ 请求拦截, 成功:{response_text}, url:{url}", "method", method,
                                              "params", params,
                                              "data",
                                              data,
                                              "headers",
                                              headers,
                                              "timeout",
                                              timeout)
            return response_json_data
        except Exception as e:
            CommonUtil.trace_exception_stack(e)
            response_text = _.get(e.args, '0.text')
            ConstantEnum.is_debug() and print(
                f"❌ 请求拦截, 失败: {e}, e.response.text: {response_text}, url:{url}",
                "method",
                method,
                "params",
                params,
                "data", data, "headers",
                "response", hasattr(e, 'response') and e.response,
                headers,
                "timeout",
                timeout)
            raise