文件预览

write_bitable.py

查看 Social Media Data Collector 技能包中的文件内容。

文件内容

scripts/write_bitable.py

#!/usr/bin/env python3
"""
Write collected metrics to Feishu Bitable (多维表格).

Usage:
    python3 write_bitable.py --app-token XXX --table-id YYY --data results.json
    python3 write_bitable.py --app-token XXX --table-id YYY --data results.json --app-id CLI_ID --app-secret SECRET

Input format (results.json): array of objects with:
    {platform, record_id, metrics: {likes, comments, shares, collects, plays}}
"""

import json
import sys
import os
import argparse
from urllib.request import Request, urlopen
from datetime import datetime


def get_token(app_id, app_secret):
    """Get Feishu tenant access token."""
    req = Request(
        "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal",
        data=json.dumps({"app_id": app_id, "app_secret": app_secret}).encode(),
        headers={"Content-Type": "application/json"}
    )
    return json.loads(urlopen(req).read())["tenant_access_token"]


def build_update_fields(metrics, date_str=None):
    """Convert metrics dict to bitable field format.
    
    IMPORTANT:
    - 播放量, 互动量合计 are TEXT fields → use strings like "20万"
    - 点赞, 评论, 分享, 收藏 are NUMBER fields → use int
    - 数据统计日期 is TEXT field → use "2026.5.15" format
    """
    fields = {}
    
    if metrics.get("likes") is not None:
        fields["点赞"] = int(metrics["likes"])
    if metrics.get("comments") is not None:
        fields["评论"] = int(metrics["comments"])
    if metrics.get("shares") is not None:
        fields["分享"] = int(metrics["shares"])
    if metrics.get("collects") is not None:
        fields["收藏"] = int(metrics["collects"])
    if metrics.get("plays") is not None:
        # plays can be string like "20万" or int
        val = metrics["plays"]
        if isinstance(val, (int, float)):
            if val >= 10000:
                fields["播放量"] = f"{val/10000:.1f}万"
            else:
                fields["播放量"] = str(int(val))
        else:
            fields["播放量"] = str(val)
    
    if date_str:
        fields["数据统计日期"] = date_str
    
    return fields


def batch_update(token, app_token, table_id, updates):
    """Batch update bitable records."""
    url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records/batch_update"
    payload = json.dumps({"records": updates}).encode()
    req = Request(url, data=payload, headers={
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json"
    }, method="POST")
    resp = json.loads(urlopen(req).read())
    return resp


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--app-token", required=True, help="Bitable app token")
    parser.add_argument("--table-id", required=True, help="Table ID")
    parser.add_argument("--data", required=True, help="Results JSON file")
    parser.add_argument("--app-id", default=os.environ.get("FEISHU_APP_ID"), help="Feishu app ID (or set FEISHU_APP_ID env var)")
    parser.add_argument("--app-secret", default=os.environ.get("FEISHU_APP_SECRET"), help="Feishu app secret (or set FEISHU_APP_SECRET env var)")
    parser.add_argument("--date", default=None, help="Stats date (default: today)")
    args = parser.parse_args()

    # Default date
    if not args.date:
        now = datetime.now()
        args.date = f"{now.year}.{now.month}.{now.day}"

    token = get_token(args.app_id, args.app_secret)
    
    with open(args.data) as f:
        results = json.load(f)

    updates = []
    skipped = []
    
    for r in results:
        if "error" in r or not r.get("record_id"):
            skipped.append(r.get("platform", "unknown"))
            continue
        
        fields = build_update_fields(r.get("metrics", {}), args.date)
        if fields:
            updates.append({"record_id": r["record_id"], "fields": fields})
    
    if not updates:
        print("No records to update")
        return

    # Batch in groups of 500 (API limit)
    for i in range(0, len(updates), 500):
        batch = updates[i:i+500]
        resp = batch_update(token, args.app_token, args.table_id, batch)
        code = resp.get("code", -1)
        if code == 0:
            print(f"✅ Updated {len(batch)} records")
        else:
            print(f"❌ Error: {resp.get('msg')} (code={code})")
            # Dump failed records for debugging
            print(json.dumps(batch[:2], ensure_ascii=False, indent=2))

    if skipped:
        print(f"⚠️ Skipped: {', '.join(skipped)}")


if __name__ == "__main__":
    main()