文件预览

publish.sh

查看 Wechat Publisher DraftGet 技能包中的文件内容。

文件内容

scripts/publish.sh

#!/usr/bin/env bash
# wechat-publisher: 使用 md2wechat create_draft 正式推送微信公众号草稿并核验
# Usage: ./publish.sh <article.md|article.html> <cover-image> [title] [author] [digest]

set -euo pipefail

RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m'

usage() {
  cat <<'EOF'
Usage:
  publish.sh <article.md|article.html> <cover-image> [title] [author] [digest]

正式链路:
  md2wechat inspect/preview/convert(若输入 Markdown)
  -> md2wechat upload_image cover
  -> 生成 draft.json
  -> md2wechat create_draft draft.json
  -> 微信 draft/get 后台核验

Examples:
  ./scripts/publish.sh article.html cover.png "文章标题" "野哥" "摘要"
  ./scripts/publish.sh article.md cover.png "文章标题" "野哥"

Notes:
  - 输入 HTML 时默认认为它已经是公众号兼容 HTML。
  - 输入 Markdown 时脚本会先用 md2wechat 转 HTML;AI 模式生成的 HTML应由外层 Agent 先落盘后再传入本脚本。
EOF
}

fail() { echo -e "${RED}❌ $*${NC}" >&2; exit 1; }
info() { echo -e "${YELLOW}$*${NC}"; }
ok() { echo -e "${GREEN}$*${NC}"; }

if [[ $# -lt 1 || "${1:-}" == "-h" || "${1:-}" == "--help" ]]; then
  usage
  exit 0
fi

ARTICLE_PATH="$1"
COVER_PATH="${2:-}"
TITLE="${3:-}"
AUTHOR="${4:-野哥}"
DIGEST="${5:-}"

[[ -f "$ARTICLE_PATH" ]] || fail "文章文件不存在: $ARTICLE_PATH"
[[ -n "$COVER_PATH" ]] || fail "必须提供封面图片路径"
[[ -f "$COVER_PATH" ]] || fail "封面图片不存在: $COVER_PATH"
command -v md2wechat >/dev/null 2>&1 || fail "md2wechat CLI 未安装或不在 PATH"
command -v python3 >/dev/null 2>&1 || fail "python3 不可用"

WORKDIR="$(cd "$(dirname "$ARTICLE_PATH")" && pwd)"
ARTICLE_ABS="$(cd "$(dirname "$ARTICLE_PATH")" && pwd)/$(basename "$ARTICLE_PATH")"
COVER_ABS="$(cd "$(dirname "$COVER_PATH")" && pwd)/$(basename "$COVER_PATH")"
BASE_NAME="$(basename "$ARTICLE_PATH")"
STEM="${BASE_NAME%.*}"
HTML_PATH="$ARTICLE_ABS"
DRAFT_JSON="$WORKDIR/${STEM}.md2wechat-create-draft.json"
CREATE_RESULT="$WORKDIR/${STEM}.md2wechat-create-draft-result.json"
VERIFY_JSON="$WORKDIR/${STEM}.md2wechat-create-draft-verify.json"
UPLOAD_JSON="$WORKDIR/${STEM}.md2wechat-cover-upload.json"

info "🔎 校验 md2wechat 配置..."
md2wechat config validate --json >/dev/null

if [[ "$ARTICLE_ABS" =~ \.md$|\.markdown$ ]]; then
  info "🔎 inspect / preview Markdown..."
  (cd "$WORKDIR" && md2wechat inspect "$ARTICLE_ABS" --json > "$WORKDIR/${STEM}.md2wechat-inspect.json")
  (cd "$WORKDIR" && md2wechat preview "$ARTICLE_ABS" --json > "$WORKDIR/${STEM}.md2wechat-preview.json")
  HTML_PATH="$WORKDIR/${STEM}.md2wechat-layout.html"
  info "🧩 转换 Markdown 为 HTML: $HTML_PATH"
  (cd "$WORKDIR" && md2wechat convert "$ARTICLE_ABS" --output "$HTML_PATH" --json > "$WORKDIR/${STEM}.md2wechat-convert.json")
fi

[[ -f "$HTML_PATH" ]] || fail "HTML 产物不存在: $HTML_PATH"

info "🖼️ 上传封面..."
(cd "$WORKDIR" && md2wechat upload_image "$COVER_ABS" --json > "$UPLOAD_JSON")

info "🧾 生成 draft.json..."
python3 - "$HTML_PATH" "$UPLOAD_JSON" "$DRAFT_JSON" "$TITLE" "$AUTHOR" "$DIGEST" <<'PY'
import json, pathlib, re, sys
html_path=pathlib.Path(sys.argv[1])
upload_json=pathlib.Path(sys.argv[2])
out=pathlib.Path(sys.argv[3])
title=sys.argv[4].strip()
author=sys.argv[5].strip() or '野哥'
digest=sys.argv[6].strip()
html=html_path.read_text(encoding='utf-8')
upload=json.loads(upload_json.read_text(encoding='utf-8'))
media_id=(upload.get('data') or {}).get('media_id')
if not media_id:
    raise SystemExit('封面上传结果中没有 media_id')
if not title:
    m=re.search(r'<h1[^>]*>(.*?)</h1>', html, flags=re.I|re.S)
    if m:
        title=re.sub(r'<[^>]+>','',m.group(1)).strip()
if not title:
    title=html_path.stem
if len(title)>32:
    title=title[:32]
if len(author)>16:
    author=author[:16]
if not digest:
    text=re.sub(r'(?is)<(script|style)[^>]*>.*?</\\1>',' ',html)
    text=re.sub(r'<[^>]+>',' ',text)
    text=' '.join(text.split())
    digest=text[:120]
if len(digest)>128:
    digest=digest[:128]
draft={'articles':[{'title':title,'author':author,'digest':digest,'content':html,'thumb_media_id':media_id,'show_cover_pic':0}]}
out.write_text(json.dumps(draft,ensure_ascii=False,indent=2),encoding='utf-8')
print(json.dumps({'draft_json':str(out),'title':title,'author':author,'digest_len':len(digest),'content_len':len(html),'thumb_media_id':media_id},ensure_ascii=False,indent=2))
PY

info "🚀 create_draft 推送草稿..."
(cd "$WORKDIR" && md2wechat create_draft "$DRAFT_JSON" --json > "$CREATE_RESULT")
cat "$CREATE_RESULT"

info "✅ draft/get 后台核验..."
python3 - "$CREATE_RESULT" "$DRAFT_JSON" "$VERIFY_JSON" <<'PY'
import json, os, pathlib, sys, urllib.parse, urllib.request, yaml
create_result=pathlib.Path(sys.argv[1])
draft_json=pathlib.Path(sys.argv[2])
out=pathlib.Path(sys.argv[3])
created=json.loads(create_result.read_text(encoding='utf-8'))
media_id=(created.get('data') or {}).get('media_id')
if not media_id:
    raise SystemExit('create_draft 结果中没有 media_id')
draft_req=json.loads(draft_json.read_text(encoding='utf-8'))
expected=(draft_req.get('articles') or [{}])[0]
cfg_path=pathlib.Path.home()/'.config/md2wechat/config.yaml'
cfg=yaml.safe_load(cfg_path.read_text(encoding='utf-8')) or {}
appid=(cfg.get('wechat') or {}).get('appid') or os.getenv('WECHAT_APPID')
secret=(cfg.get('wechat') or {}).get('secret') or os.getenv('WECHAT_SECRET')
if not appid or not secret:
    raise SystemExit('缺少 WECHAT_APPID / WECHAT_SECRET 或 md2wechat config wechat 配置')
url='https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid='+urllib.parse.quote(appid)+'&secret='+urllib.parse.quote(secret)
with urllib.request.urlopen(url, timeout=20) as r:
    token_resp=json.loads(r.read().decode())
result={'target_media_id':media_id,'token_response':{k:('***' if k=='access_token' else v) for k,v in token_resp.items()}}
if 'access_token' not in token_resp:
    result.update({'passed':False,'error':'no access_token'})
else:
    req=urllib.request.Request('https://api.weixin.qq.com/cgi-bin/draft/get?access_token='+urllib.parse.quote(token_resp['access_token']), data=json.dumps({'media_id':media_id},ensure_ascii=False).encode(), headers={'Content-Type':'application/json'}, method='POST')
    with urllib.request.urlopen(req, timeout=20) as r:
        draft_resp=json.loads(r.read().decode())
    item=(draft_resp.get('news_item') or [{}])[0] if isinstance(draft_resp.get('news_item'), list) else {}
    content=item.get('content','') or ''
    checks={
        'errcode_ok': draft_resp.get('errcode') in (None,0),
        'title': item.get('title'),
        'title_ok': item.get('title')==expected.get('title'),
        'author': item.get('author'),
        'author_ok': item.get('author')==expected.get('author'),
        'digest': item.get('digest'),
        'thumb_media_id_present': bool(item.get('thumb_media_id')),
        'content_length': len(content),
        'content_has_local_path': '/home/ye/' in content or 'content-factory/' in content,
        'content_has_inline_style': 'style=' in content,
        'url_present': bool(item.get('url')),
        'thumb_url_present': bool(item.get('thumb_url')),
    }
    result.update({'draft_get_response':draft_resp,'checks':checks,'passed':checks['errcode_ok'] and checks['title_ok'] and checks['author_ok'] and checks['thumb_media_id_present'] and not checks['content_has_local_path']})
out.write_text(json.dumps(result,ensure_ascii=False,indent=2),encoding='utf-8')
print(json.dumps(result,ensure_ascii=False,indent=2))
if not result.get('passed'):
    raise SystemExit(1)
PY

ok "🎉 草稿推送并核验通过"
echo "create_result: $CREATE_RESULT"
echo "verify_json:   $VERIFY_JSON"