文件预览

run.py

查看 unisound-glucose-monitor-record 技能包中的文件内容。

文件内容

scripts/run.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""血糖监测记录 — 自包含skill,独立部署,不依赖_shared/"""

import argparse
import json
import re
import sys
from datetime import date as date_type
from pathlib import Path
from typing import Any, Dict, List, Optional
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError

# ── 固定配置 ──
API_URL = "https://maas-api.hivoice.cn/v1/chat/completions"
MODEL = "u1-insuremed"

# ── 同目录导入(preprocess.py 每个skill自带一份)──
from preprocess import (
    PreprocessError, SUPPORTED_FILE_TYPES,
    detect_input_type, load_input_artifact, first_matching_index, normalize_header,
)

FIELD_ALIASES = {
    "value": ["value", "血糖值", "值"],
    "unit": ["unit", "单位"],
    "measure_type": ["measure_type", "测量类型", "类型"],
    "measured_at": ["measured_at", "测量时间"],
    "note": ["note", "备注"],
}


# ═══════════════════════════════════════════════════════════════════
# LLM 调用(内联,无外部依赖)
# ═══════════════════════════════════════════════════════════════════

def _call_llm(system_prompt: str, user_prompt: str, appkey: str) -> str:
    payload = {
        "model": MODEL,
        "temperature": 0.0,
        "messages": [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
    }
    try:
        req = Request(API_URL, data=json.dumps(payload, ensure_ascii=False).encode("utf-8"),
                      headers={"Content-Type": "application/json",
                               "Authorization": f"Bearer {appkey}"})
        resp = urlopen(req, timeout=120)
        body = json.loads(resp.read().decode("utf-8"))
    except HTTPError as exc:
        detail = exc.read().decode("utf-8", errors="replace")[:500]
        raise RuntimeError(f"API HTTP {exc.code}: {detail}")
    except URLError as exc:
        raise RuntimeError(f"API unreachable: {exc.reason}")
    if "choices" not in body or not body["choices"]:
        raise RuntimeError("API response missing choices")
    return body["choices"][0].get("message", {}).get("content", "")


def _extract_json(text: str) -> Optional[Dict[str, Any]]:
    text = text.strip()
    try:
        val = json.loads(text)
        if isinstance(val, dict):
            return val
    except (json.JSONDecodeError, ValueError):
        pass
    code_block = re.search(r"```(?:json)?\s*\n?(.*?)\n?\s*```", text, re.DOTALL)
    if code_block:
        try:
            val = json.loads(code_block.group(1).strip())
            if isinstance(val, dict):
                return val
        except (json.JSONDecodeError, ValueError):
            pass
    start = text.find("{")
    if start == -1:
        return None
    depth = 0
    for i in range(start, len(text)):
        if text[i] == "{":
            depth += 1
        elif text[i] == "}":
            depth -= 1
            if depth == 0:
                try:
                    val = json.loads(text[start:i + 1])
                    if isinstance(val, dict):
                        return val
                except (json.JSONDecodeError, ValueError):
                    pass
                break
    return None


# ═══════════════════════════════════════════════════════════════════
# 本地预处理
# ═══════════════════════════════════════════════════════════════════

def require(data: Dict[str, Any], key: str) -> Any:
    value = data.get(key)
    if value is None or value == "":
        raise ValueError(f"missing required field: {key}")
    return value


def _local_normalize(records: List[Dict]) -> List[Dict]:
    result = []
    for r in records:
        result.append({
            "value": r.get("value", ""),
            "unit": r.get("unit", "mmol/L"),
            "measure_type": r.get("measure_type", ""),
            "measured_at": r.get("measured_at", ""),
            "note": r.get("note", ""),
        })
    return result


# ═══════════════════════════════════════════════════════════════════
# 核心逻辑
# ═══════════════════════════════════════════════════════════════════

SYSTEM_PROMPT = """你是一位专业的慢病管理助手,擅长血糖数据分析。

你的任务:
1. 根据用户提供的血糖记录数据,进行结构化记录
2. 判断每条血糖值是否在正常范围(空腹3.9-6.1 mmol/L,餐后<7.8 mmol/L,随机<11.1 mmol/L)
3. 如果是多条记录,分析血糖趋势(上升/下降/波动/稳定)
4. 标注异常值并给出风险提示
5. 给出简洁实用的生活方式建议

输出要求:
- 输出自然语言Markdown文本,面向患者,通俗易懂
- 包含:血糖记录概览、趋势分析(多记录时)、异常提示、建议
- 禁止输出任何医嘱或诊断结论
- 末尾加上免责提示:本内容仅供参考,不替代医生诊疗
"""


def build(data: Dict[str, Any], appkey: str) -> Dict[str, Any]:
    # 1. 本地预处理:标准化记录列表
    raw = data if isinstance(data, list) else [data]
    records = _local_normalize(raw)

    # 2. 校验必填字段
    for i, rec in enumerate(records):
        require(rec, "value")

    # 3. 构建 user prompt
    user_prompt = f"请分析以下血糖监测数据:\n```json\n{json.dumps(records, ensure_ascii=False, indent=2)}\n```"

    # 4. 调用 API
    text = _call_llm(SYSTEM_PROMPT, user_prompt, appkey)

    return {
        "skill": "血糖监测记录",
        "status": "ok",
        "data": {
            "record_type": "blood_glucose",
            "record_count": len(records),
            "records": records,
        },
        "text": text.strip(),
    }


# ═══════════════════════════════════════════════════════════════════
# 多格式输入解析
# ═══════════════════════════════════════════════════════════════════

def load_json(path: str) -> Dict[str, Any]:
    data = json.loads(Path(path).read_text(encoding="utf-8"))
    if isinstance(data, list):
        return data
    if not isinstance(data, dict):
        raise ValueError("input must be a JSON object or array")
    return data


def parse_text_kv(text: str, field_aliases: Dict[str, List[str]]) -> Dict[str, Any]:
    result: Dict[str, Any] = {}
    lines = [line.strip() for line in text.strip().splitlines() if line.strip()]
    pattern = re.compile(r"^\s*([A-Za-z一-鿿_][A-Za-z0-9一-鿿_\s]*)\s*[::]\s*(.+?)\s*$")
    header_map = {}
    for canonical, aliases in field_aliases.items():
        for alias in aliases:
            header_map[normalize_header(alias)] = canonical
            header_map[normalize_header(canonical)] = canonical
    for line in lines:
        match = pattern.match(line)
        if not match:
            continue
        raw_key = match.group(1).strip()
        value_str = match.group(2).strip()
        key = header_map.get(normalize_header(raw_key))
        if key is None:
            continue
        try:
            value: Any = json.loads(value_str)
        except (json.JSONDecodeError, ValueError):
            value = value_str
        result[key] = value
    return result


def normalize_artifact(artifact: Dict[str, Any], field_aliases: Dict[str, List[str]]) -> Dict[str, Any]:
    kind = artifact.get("kind")
    if kind == "json":
        data = artifact["data"]
        if isinstance(data, (dict, list)):
            return data
        raise PreprocessError("JSON input must be an object or array.")
    if kind == "text":
        text = artifact.get("text", "")
        try:
            data = json.loads(text)
            if isinstance(data, (dict, list)):
                return data
        except (json.JSONDecodeError, ValueError):
            pass
        return parse_text_kv(text, field_aliases)
    if kind == "tables":
        all_rows: List[List[str]] = []
        for table in artifact.get("tables", []):
            all_rows.extend(table.get("rows", []))
        if not all_rows:
            raise PreprocessError("No data rows found in table input.")
        header_row = all_rows[0]
        col_map = {}
        for canonical, aliases in field_aliases.items():
            for alias in aliases:
                idx = first_matching_index(
                    {normalize_header(cell): i for i, cell in enumerate(header_row) if cell.strip()},
                    (alias, canonical),
                )
                if idx is not None:
                    col_map[canonical] = idx
                    break
        records = []
        for data_row in all_rows[1:]:
            rec = {}
            for key, idx in col_map.items():
                if idx < len(data_row) and data_row[idx].strip():
                    val = data_row[idx].strip()
                    try:
                        rec[key] = json.loads(val)
                    except (json.JSONDecodeError, ValueError):
                        rec[key] = val
            if rec:
                records.append(rec)
        if not records:
            return {}
        return records if len(records) > 1 else records[0]
    raise PreprocessError(f"Unsupported artifact kind: {kind}")


def write_json(data: Dict[str, Any], output: str) -> None:
    text = json.dumps(data, ensure_ascii=False, indent=2)
    if output:
        Path(output).parent.mkdir(parents=True, exist_ok=True)
        Path(output).write_text(text + "\n", encoding="utf-8")
    else:
        print(text)


# ═══════════════════════════════════════════════════════════════════
# 入口
# ═══════════════════════════════════════════════════════════════════

def main() -> int:
    parser = argparse.ArgumentParser(description="血糖监测记录 — 结构化血糖数据并调用AI分析")
    parser.add_argument("--input", required=True, help="输入文件路径")
    parser.add_argument("--output", default="", help="输出JSON路径,不传则输出到stdout")
    parser.add_argument("--input-type", default="auto",
                        choices=["auto", *sorted(SUPPORTED_FILE_TYPES)],
                        help="输入类型,默认auto自动检测")
    parser.add_argument("--sheet", default="", help="xlsx指定sheet名")
    parser.add_argument("--encoding", default="utf-8", help="txt/csv编码,默认utf-8")
    parser.add_argument("--save-prepared", action="store_true", help="保存预处理后的JSON便于调试")
    parser.add_argument("--appkey", required=True, help="内部医疗大模型鉴权key(必填)")
    args = parser.parse_args()

    try:
        input_path = Path(args.input)
        if not input_path.exists():
            print(f"ERROR: Input file not found: {input_path}", file=sys.stderr)
            return 1

        input_type = detect_input_type(input_path, args.input_type)
        if input_type == "json":
            data = load_json(args.input)
        else:
            artifact = load_input_artifact(input_path, input_type, args.encoding, args.sheet)
            data = normalize_artifact(artifact, FIELD_ALIASES)

        if args.save_prepared:
            prepared_path = Path(args.output).with_suffix(".prepared.json") if args.output else input_path.with_suffix(".prepared.json")
            prepared_path.parent.mkdir(parents=True, exist_ok=True)
            prepared_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")

        result = build(data, args.appkey)
        write_json(result, args.output)
        return 0
    except PreprocessError as exc:
        # 回退到 _shared/doc-preprocess 尝试处理
        try:
            _shared_dir = Path(__file__).resolve().parent.parents[3] / "_shared" / "doc-preprocess" / "scripts"
            if not _shared_dir.exists():
                print(f"ERROR: 无法读取输入文件,本地预处理失败且 _shared/doc-preprocess 不可用。原因:{exc}", file=sys.stderr)
                return 1
            import importlib.util as _iu
            _spec = _iu.spec_from_file_location("_shared_preprocess", _shared_dir / "preprocess.py")
            _sp = _iu.module_from_spec(_spec)
            _spec.loader.exec_module(_sp)
            input_type = _sp.detect_input_type(input_path, args.input_type)
            if input_type == "json":
                data = load_json(args.input)
            else:
                artifact = _sp.load_input_artifact(input_path, input_type, args.encoding, args.sheet)
                data = normalize_artifact(artifact, FIELD_ALIASES)
            if args.save_prepared:
                prepared_path = Path(args.output).with_suffix(".prepared.json") if args.output else input_path.with_suffix(".prepared.json")
                prepared_path.parent.mkdir(parents=True, exist_ok=True)
                prepared_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
            result = build(data, args.appkey)
            write_json(result, args.output)
            return 0
        except Exception as shared_exc:
            print(f"ERROR: 无法读取输入文件。本地预处理失败:{exc};_shared/doc-preprocess 回退也失败:{shared_exc}", file=sys.stderr)
            return 1
    except Exception as exc:
        print(f"ERROR: {exc}", file=sys.stderr)
        return 1


if __name__ == "__main__":
    raise SystemExit(main())