文件内容
scripts/media_monitor.py
#!/usr/bin/env python3
"""
自媒体账号数据监测工具
支持平台:小红书、抖音、公众号
通过 Cookie 注入 + Playwright 访问创作者中心,抓取账号数据
支持单篇笔记/文章完整互动数据
用法:
python3 media_monitor.py --platform xhs --action fetch # 抓取小红书汇总数据
python3 media_monitor.py --platform xhs --action note-details # 抓取每篇笔记完整互动数据
python3 media_monitor.py --platform xhs --action report # 生成分析报告
python3 media_monitor.py --platform xhs --action fetch-and-report # 抓取+报告
python3 media_monitor.py --platform xhs --action analyze # 完整分析(汇总+单篇+两层分析)
python3 media_monitor.py --platform douyin --action fetch # 抓取抖音数据
python3 media_monitor.py --platform wechat --action article-stats # 抓取公众号文章数据
python3 media_monitor.py --platform wechat --action analyze # 公众号完整分析
python3 media_monitor.py --platform all --action fetch-and-report # 全平台
python3 media_monitor.py --platform xhs --action check-cookie # 检查Cookie
Cookie 获取方式:
小红书:浏览器登录 creator.xiaohongshu.com → F12 → Network → 复制Cookie
抖音:浏览器登录 creator.douyin.com → F12 → Network → 复制Cookie
公众号:浏览器登录 mp.weixin.qq.com → F12 → Network → 复制Cookie
"""
import argparse
import json
import os
import sys
import time
from datetime import datetime, timedelta
from pathlib import Path
from collections import defaultdict
# ============ 路径配置 ============
SCRIPT_DIR = Path(__file__).parent
SKILL_ROOT = SCRIPT_DIR.parent
DATA_DIR = SKILL_ROOT / 'data'
DATA_DIR.mkdir(exist_ok=True)
ENV_FILE = SKILL_ROOT / '.env'
# 小红书Cookie文件(复用xhs-publisher的Cookie)
XHS_COOKIE_FILE = Path('/home/z/my-project/skills/xhs-publisher/scripts/.cookies/xhs_cookie.txt')
# 小红书用户ID
XHS_USER_ID = '5f02f3630000000001000859'
# ============ Cookie 管理 ============
def load_cookie(platform):
"""加载指定平台的Cookie"""
if platform == 'xhs':
if XHS_COOKIE_FILE.exists():
cookie = XHS_COOKIE_FILE.read_text().strip()
if cookie:
return cookie
if ENV_FILE.exists():
for line in ENV_FILE.read_text().splitlines():
if line.strip().startswith('XHS_COOKIE='):
return line.split('=', 1)[1].strip().strip('"').strip("'")
return None
elif platform == 'douyin':
if ENV_FILE.exists():
for line in ENV_FILE.read_text().splitlines():
if line.strip().startswith('DOUYIN_COOKIE='):
return line.split('=', 1)[1].strip().strip('"').strip("'")
return None
elif platform == 'wechat':
if ENV_FILE.exists():
for line in ENV_FILE.read_text().splitlines():
if line.strip().startswith('WECHAT_MP_COOKIE='):
return line.split('=', 1)[1].strip().strip('"').strip("'")
return None
return None
def _parse_cookie_dict(cookie_str):
"""将Cookie字符串转为dict"""
result = {}
for item in cookie_str.split(';'):
item = item.strip()
if '=' in item:
name, value = item.split('=', 1)
result[name.strip()] = value.strip()
return result
def _parse_a1_web_session(cookie_str):
"""从Cookie中提取a1和web_session"""
a1 = ''
web_session = ''
for item in cookie_str.split(';'):
item = item.strip()
if '=' in item:
name, value = item.split('=', 1)
if name.strip() == 'a1':
a1 = value.strip()
elif name.strip() == 'web_session':
web_session = value.strip()
return a1, web_session
# ============ 小红书:汇总数据抓取 ============
def _xhs_cookie_to_playwright(cookie_str):
"""将Cookie字符串转为Playwright格式"""
cookies = []
for item in cookie_str.split(';'):
item = item.strip()
if '=' in item:
name, value = item.split('=', 1)
cookies.append({
'name': name.strip(),
'value': value.strip(),
'domain': '.xiaohongshu.com',
'path': '/'
})
return cookies
def fetch_xhs_data(cookie=None):
"""抓取小红书创作者汇总数据(账号信息、粉丝、笔记概览等)"""
from playwright.sync_api import sync_playwright
cookie = cookie or load_cookie('xhs')
if not cookie:
print("❌ 未找到小红书Cookie")
print(" 获取方式:浏览器登录 creator.xiaohongshu.com → F12 → Network → 复制Cookie")
return None
result = {
'platform': 'xhs',
'fetch_time': datetime.now().isoformat(),
'user_info': {},
'account_data': {},
'fans_overview': {},
'fans_portrait': {},
'note_overview': {},
'latest_note': {},
'raw_api_count': 0,
}
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
viewport={'width': 1280, 'height': 800}
)
context.add_cookies(_xhs_cookie_to_playwright(cookie))
page = context.new_page()
api_data = {}
def handle_response(response):
url = response.url
if '/api/galaxy/' in url and '.js' not in url:
path = url.split('?')[0].split('/api/galaxy/')[-1]
try:
body = response.json()
api_data[path] = body
except:
pass
page.on('response', handle_response)
page.goto('https://creator.xiaohongshu.com/creator/home', timeout=30000, wait_until='domcontentloaded')
page.wait_for_timeout(5000)
# 直接请求额外API
extra_apis = [
'/api/galaxy/creator/data/fans/overall_new',
'/api/galaxy/creator/data/fans_portrait_new',
'/api/galaxy/creator/data/fans_source',
]
for api_path in extra_apis:
url = f'https://creator.xiaohongshu.com{api_path}'
try:
resp_text = page.evaluate(f'''
async () => {{
const resp = await fetch("{url}");
return await resp.text();
}}
''')
data = json.loads(resp_text)
api_data[api_path] = data
except Exception as e:
print(f" ⚠️ {api_path} 请求失败: {str(e)[:80]}")
result['raw_api_count'] = len(api_data)
# 解析用户信息
if 'creator/home/personal_info' in api_data:
info = api_data['creator/home/personal_info'].get('data', {})
result['user_info'] = {
'nickname': info.get('name', ''),
'red_id': info.get('red_num', ''),
'avatar': info.get('avatar', ''),
'fans_count': info.get('fans_count', 0),
'follow_count': info.get('follow_count', 0),
'faved_count': info.get('faved_count', 0),
'desc': info.get('personal_desc', ''),
}
# 解析账号数据概览
if 'v2/creator/datacenter/account/base' in api_data:
base = api_data['v2/creator/datacenter/account/base'].get('data', {})
seven = base.get('seven', {})
thirty = base.get('thirty', {})
result['account_data'] = {
'7day': {
'impression': seven.get('impl_count', 0),
'publish_notes': seven.get('publish_normal_note_num_list', []),
},
'30day': {
'cover_click_rate': thirty.get('cover_click_rate', 0),
'impression': thirty.get('impl_count_list', []),
},
'analyse': base.get('analyse_infos', []),
}
# 解析粉丝总览
if 'creator/data/fans/overall_new' in api_data:
fans = api_data['creator/data/fans/overall_new'].get('data', {})
seven = fans.get('seven', {})
thirty = fans.get('thirty', {})
result['fans_overview'] = {
'total_fans': seven.get('fans_count', 0),
'7day_new_fans': seven.get('rise_fans_count', 0),
'7day_lost_fans': seven.get('leave_fans_count', 0),
'30day_new_fans': thirty.get('rise_fans_count', 0),
'30day_lost_fans': thirty.get('leave_fans_count', 0),
'7day_trend': seven.get('rise_fans_list', []),
'30day_trend': thirty.get('rise_fans_list', []),
}
# 解析粉丝画像
if 'creator/data/fans_portrait_new' in api_data:
portrait = api_data['creator/data/fans_portrait_new'].get('data', {})
result['fans_portrait'] = {
'gender': {item.get('title', ''): item.get('value', 0) for item in portrait.get('gender', [])},
'age': {item.get('title', ''): item.get('value', 0) for item in portrait.get('age', [])},
'city': {item.get('title', ''): item.get('value', 0) for item in portrait.get('city', [])},
'interest': {item.get('title', ''): item.get('value', 0) for item in portrait.get('interest', [])},
}
# 解析笔记数据概览
if 'creator/data/note_detail_new' in api_data:
note = api_data['creator/data/note_detail_new'].get('data', {})
seven = note.get('seven', {})
thirty = note.get('thirty', {})
result['note_overview'] = {
'7day': {
'views': seven.get('view_count', 0),
'avg_view_time': seven.get('view_time_avg', 0),
'likes': seven.get('like_count', 0),
'collects': seven.get('collect_count', 0),
'comments': seven.get('comment_count', 0),
'shares': seven.get('share_count', 0),
},
'30day': {
'views': thirty.get('view_count', 0),
'avg_view_time': thirty.get('view_time_avg', 0),
'likes': thirty.get('like_count', 0),
'collects': thirty.get('collect_count', 0),
'comments': thirty.get('comment_count', 0),
'shares': thirty.get('share_count', 0),
},
'analyse': note.get('analyse_infos', []),
}
# 解析最新笔记
if 'creator/home/latest_note_data' in api_data:
latest = api_data['creator/home/latest_note_data'].get('data', {})
note_info = latest.get('noteInfo', {})
result['latest_note'] = {
'id': note_info.get('id', ''),
'title': note_info.get('title', ''),
'cover': note_info.get('coverUrl', ''),
}
browser.close()
return result
# ============ 小红书:单篇笔记数据抓取 ============
def fetch_xhs_note_list(cookie=None):
"""从用户主页获取笔记列表(标题+笔记ID+点赞数)"""
from playwright.sync_api import sync_playwright
cookie = cookie or load_cookie('xhs')
if not cookie:
print("❌ 未找到小红书Cookie")
return []
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
viewport={'width': 1280, 'height': 800}
)
context.add_cookies(_xhs_cookie_to_playwright(cookie))
page = context.new_page()
page.goto(f'https://www.xiaohongshu.com/user/profile/{XHS_USER_ID}', timeout=30000, wait_until='domcontentloaded')
page.wait_for_timeout(5000)
# 滚动加载更多(5次确保获取全部笔记)
for _ in range(5):
page.evaluate('window.scrollTo(0, document.body.scrollHeight)')
page.wait_for_timeout(2000)
# 提取笔记卡片
notes = page.evaluate('''
() => {
const sections = document.querySelectorAll('section.note-item');
const results = [];
sections.forEach(s => {
const linkEl = s.querySelector('a.cover, a[href*="/explore/"]');
const titleEl = s.querySelector('a.title span, a.title');
const likeEl = s.querySelector('.like-wrapper .count, .author-wrapper .count, span.count');
const href = linkEl ? (linkEl.getAttribute('href') || '') : '';
const noteId = href ? href.split('/')[2]?.split('?')[0] : '';
const title = titleEl ? titleEl.textContent.trim() : '';
const likes = likeEl ? likeEl.textContent.trim() : '';
if (noteId) {
results.push({noteId, title, likes});
}
});
return results;
}
''')
browser.close()
return notes
def fetch_xhs_note_details(note_ids=None, cookie=None):
"""获取每篇笔记的完整互动数据(赞/藏/评/转/阅读/涨粉)
流程:
1. 加载已有数据(xhs_note_details.json)
2. 从用户主页获取笔记列表(增量发现新笔记)
3. 合并去重,用签名API获取新笔记的详细数据
4. 保存更新后的完整数据
"""
import requests
from xhs.help import sign
cookie = cookie or load_cookie('xhs')
if not cookie:
print("❌ 未找到小红书Cookie")
return []
a1, web_session = _parse_a1_web_session(cookie)
cookie_dict = _parse_cookie_dict(cookie)
# 1. 加载已有数据
existing = load_note_details()
existing_ids = {d['note_id'] for d in existing if 'note_id' in d and not d.get('error')}
print(f" 已有 {len(existing_ids)} 篇笔记数据")
# 2. 从用户主页获取笔记列表(增量发现)
print(" 正在扫描用户主页...")
profile_notes = fetch_xhs_note_list(cookie)
profile_ids = {n['noteId'] for n in profile_notes if n.get('noteId')}
# 3. 合并:已有 + 主页新发现的
all_ids = existing_ids | profile_ids
new_ids = all_ids - existing_ids
if new_ids:
print(f" 发现 {len(new_ids)} 篇新笔记,正在获取详细数据...")
# 4. 获取新笔记的详细数据
results = list(existing) # 保留已有数据
for note_id in new_ids:
uri = f"/api/galaxy/creator/datacenter/note/base?note_id={note_id}"
try:
sign_result = sign(uri, a1=a1, b1=web_session)
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Referer': 'https://creator.xiaohongshu.com/',
'x-s': sign_result['x-s'],
'x-t': sign_result['x-t'],
'x-s-common': sign_result['x-s-common'],
}
resp = requests.get(
f'https://creator.xiaohongshu.com{uri}',
headers=headers,
cookies=cookie_dict,
timeout=15
)
data = resp.json()
if data.get('data'):
d = data['data']
note_info = d.get('note_info', {})
views = d.get('view_count', note_info.get('view_count', 0))
likes = d.get('like_count', 0)
collects = d.get('collect_count', 0)
comments = d.get('comment_count', 0)
shares = d.get('share_count', 0)
rise_fans = d.get('rise_fans_count', 0)
engage_rate = round((likes + collects + comments) / views * 100, 2) if views > 0 else 0
collect_rate = round(collects / views * 100, 2) if views > 0 else 0
share_rate = round(shares / views * 100, 2) if views > 0 else 0
comment_rate = round(comments / views * 100, 2) if views > 0 else 0
collect_like_ratio = round(collects / likes, 2) if likes > 0 else 0
results.append({
'note_id': note_id,
'title': note_info.get('desc', '')[:40],
'views': views,
'likes': likes,
'collects': collects,
'comments': comments,
'shares': shares,
'rise_fans': rise_fans,
'engage_rate': engage_rate,
'collect_rate': collect_rate,
'share_rate': share_rate,
'comment_rate': comment_rate,
'collect_like_ratio': collect_like_ratio,
'post_time': note_info.get('post_time', 0),
})
else:
results.append({'note_id': note_id, 'error': True})
except Exception as e:
results.append({'note_id': note_id, 'error': True, 'error_msg': str(e)[:100]})
time.sleep(0.3)
# 5. 保存更新后的完整数据
save_note_details(results)
print(f" ✅ 共 {len(results)} 篇笔记数据({len(existing_ids)} 已有 + {len(new_ids)} 新增)")
return results
# ============ 抖音数据抓取 ============
def _douyin_cookie_to_playwright(cookie_str):
"""将Cookie字符串转为Playwright格式"""
cookies = []
for item in cookie_str.split(';'):
item = item.strip()
if '=' in item:
name, value = item.split('=', 1)
cookies.append({
'name': name.strip(),
'value': value.strip(),
'domain': '.douyin.com',
'path': '/'
})
return cookies
def fetch_douyin_data(cookie=None):
"""抓取抖音创作者数据"""
from playwright.sync_api import sync_playwright
cookie = cookie or load_cookie('douyin')
if not cookie:
print("❌ 未找到抖音Cookie")
print(" 获取方式:浏览器登录 creator.douyin.com → F12 → Network → 复制Cookie")
return None
result = {
'platform': 'douyin',
'fetch_time': datetime.now().isoformat(),
'user_info': {},
'raw_api_count': 0,
}
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
viewport={'width': 1280, 'height': 800}
)
context.add_cookies(_douyin_cookie_to_playwright(cookie))
page = context.new_page()
api_data = {}
def handle_response(response):
url = response.url
if 'creator.douyin.com' in url and any(kw in url for kw in ['/aweme/v1/', '/web/api/']):
if '.js' not in url and '.css' not in url:
path = url.split('?')[0].split('douyin.com/')[-1]
try:
body = response.json()
api_data[path] = body
except:
pass
page.on('response', handle_response)
pages = [
'https://creator.douyin.com/creator-micro/data/summary',
'https://creator.douyin.com/creator-micro/data/content',
'https://creator.douyin.com/creator-micro/data/follower',
]
for url in pages:
try:
page.goto(url, timeout=15000, wait_until='domcontentloaded')
page.wait_for_timeout(3000)
except:
pass
result['raw_api_count'] = len(api_data)
for key in ['aweme/v1/creator/pc/user/info/', 'aweme/v1/creator/user/info/']:
if key in api_data:
info = api_data[key].get('data', {})
if info:
result['user_info'] = {
'nickname': info.get('nickname', ''),
'avatar': info.get('avatar_url', {}).get('url_list', [''])[0] if isinstance(info.get('avatar_url'), dict) else '',
'follower_count': info.get('follower_count', 0),
'following_count': info.get('following_count', 0),
'favoriting_count': info.get('favoriting_count', 0),
'aweme_count': info.get('aweme_count', 0),
}
break
login_ok = any(
v.get('status_code') == 0 and v.get('data')
for k, v in api_data.items()
if 'user/info' in k
)
if not login_ok:
result['_cookie_expired'] = True
browser.close()
return result
# ============ 公众号数据抓取 ============
def _wechat_cookie_to_playwright(cookie_str):
"""将Cookie字符串转为Playwright格式(mp.weixin.qq.com)"""
cookies = []
for item in cookie_str.split(';'):
item = item.strip()
if '=' in item:
name, value = item.split('=', 1)
cookies.append({
'name': name.strip(),
'value': value.strip(),
'domain': '.qq.com',
'path': '/'
})
return cookies
def fetch_wechat_article_stats(cookie=None):
"""抓取公众号已发布文章数据(阅读/点赞/在看/转发/收藏)
通过 Playwright + Cookie 访问 mp.weixin.qq.com 后台,
拦截内部API获取已发布文章列表及统计数据。
"""
from playwright.sync_api import sync_playwright
cookie = cookie or load_cookie('wechat')
if not cookie:
print("❌ 未找到公众号Cookie")
print(" 获取方式:浏览器登录 mp.weixin.qq.com → F12 → Network → 复制Cookie")
return None
result = {
'platform': 'wechat',
'fetch_time': datetime.now().isoformat(),
'user_info': {},
'articles': [],
'raw_api_count': 0,
}
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
context = browser.new_context(
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
viewport={'width': 1280, 'height': 800}
)
context.add_cookies(_wechat_cookie_to_playwright(cookie))
page = context.new_page()
api_data = {}
def handle_response(response):
url = response.url
# 拦截公众号后台的发布列表API
if 'mp.weixin.qq.com/cgi-bin/appmsgpublish' in url and 'sub=list' in url:
try:
body = response.json()
api_data['publish_list'] = body
except:
pass
# 拦截文章统计API
if 'mp.weixin.qq.com/cgi-bin/appmsgpublish' in url and 'sub=stat' in url:
try:
body = response.json()
api_data['stat'] = body
except:
pass
# 拦截账号信息API
if 'mp.weixin.qq.com/cgi-bin/home' in url or 'mp.weixin.qq.com/cgi-bin/bizlogin' in url:
try:
body = response.json()
api_data['home'] = body
except:
pass
page.on('response', handle_response)
# 访问公众号后台首页
try:
page.goto('https://mp.weixin.qq.com/', timeout=30000, wait_until='domcontentloaded')
page.wait_for_timeout(3000)
except Exception as e:
print(f" ⚠️ 访问后台失败: {str(e)[:80]}")
# 检查是否登录成功
page_content = page.content()
if '请登录' in page_content or '扫码登录' in page_content:
print(" ❌ Cookie已过期,请重新登录 mp.weixin.qq.com 获取Cookie")
browser.close()
result['_cookie_expired'] = True
return result
# 提取账号名称
try:
nickname = page.evaluate('''() => {
const el = document.querySelector('.weui-desktop-account__nickname, .account_setting_item .nickname, .weui-desktop-header__nickname');
return el ? el.textContent.trim() : '';
}''')
if nickname:
result['user_info']['nickname'] = nickname
except:
pass
# 访问已发布文章列表页面
try:
page.goto('https://mp.weixin.qq.com/cgi-bin/appmsgpublish?sub=list&search_field=null&begin=0&count=5&query=',
timeout=30000, wait_until='domcontentloaded')
page.wait_for_timeout(5000)
except Exception as e:
print(f" ⚠️ 访问文章列表失败: {str(e)[:80]}")
# 尝试从API拦截数据中提取文章列表
articles = []
if 'publish_list' in api_data:
publish_data = api_data['publish_list']
publish_list = publish_data.get('publish_page', {}).get('publish_list', [])
for item in publish_list:
info = item.get('publish_info', item.get('info', {}))
news_items = info.get('news_item', [])
for news in news_items:
article = {
'title': news.get('title', ''),
'url': news.get('url', ''),
'cover': news.get('thumb_url', ''),
'publish_time': news.get('create_time', 0),
'read_num': news.get('read_num', 0),
'like_num': news.get('like_num', 0),
'wow_num': news.get('wow_num', 0), # 在看
'share_num': news.get('share_num', 0), # 转发
'collect_num': news.get('collect_num', 0), # 收藏
'comment_num': news.get('comment_num', 0), # 评论
}
articles.append(article)
# 如果API拦截没拿到数据,尝试从页面DOM提取
if not articles:
print(" ⚠️ API拦截未获取到数据,尝试从页面DOM提取...")
try:
dom_articles = page.evaluate('''() => {
const results = [];
const items = document.querySelectorAll('.publish_article_list .article_item, .appmsg_item, .weui-desktop-card');
items.forEach(item => {
const titleEl = item.querySelector('.article_title, .appmsg_title, .weui-desktop-card__title');
const title = titleEl ? titleEl.textContent.trim() : '';
if (title) {
results.push({title});
}
});
return results;
}''')
if dom_articles:
articles = dom_articles
except:
pass
# 如果DOM也没拿到,尝试直接调用内部API(通过page.evaluate在页面上下文中fetch)
if not articles:
print(" ⚠️ DOM提取也未获取到数据,尝试直接调用内部API...")
try:
api_result = page.evaluate('''async () => {
const resp = await fetch('/cgi-bin/appmsgpublish?sub=list&search_field=null&begin=0&count=5&query=&type=101_1&free_publish_type=1&sub_action=list_ex');
const data = await resp.json();
return data;
}''')
if api_result:
api_data['fetch_api'] = api_result
publish_page = api_result.get('publish_page', {})
publish_list = publish_page.get('publish_list', [])
for item in publish_list:
info = item.get('publish_info', item.get('info', {}))
news_items = info.get('news_item', [])
for news in news_items:
article = {
'title': news.get('title', ''),
'url': news.get('url', ''),
'cover': news.get('thumb_url', ''),
'publish_time': news.get('create_time', 0),
'read_num': news.get('read_num', 0),
'like_num': news.get('like_num', 0),
'wow_num': news.get('wow_num', 0),
'share_num': news.get('share_num', 0),
'collect_num': news.get('collect_num', 0),
'comment_num': news.get('comment_num', 0),
}
articles.append(article)
except Exception as e:
print(f" ⚠️ 内部API调用失败: {str(e)[:80]}")
# 翻页获取更多文章(最多5页=25篇)
for page_num in range(1, 5):
if not articles:
break
offset = page_num * 5
try:
more_result = page.evaluate(f'''async () => {{
const resp = await fetch('/cgi-bin/appmsgpublish?sub=list&search_field=null&begin={offset}&count=5&query=&type=101_1&free_publish_type=1&sub_action=list_ex');
const data = await resp.json();
return data;
}}''')
if more_result:
publish_page = more_result.get('publish_page', {})
publish_list = publish_page.get('publish_list', [])
if not publish_list:
break
for item in publish_list:
info = item.get('publish_info', item.get('info', {}))
news_items = info.get('news_item', [])
for news in news_items:
article = {
'title': news.get('title', ''),
'url': news.get('url', ''),
'cover': news.get('thumb_url', ''),
'publish_time': news.get('create_time', 0),
'read_num': news.get('read_num', 0),
'like_num': news.get('like_num', 0),
'wow_num': news.get('wow_num', 0),
'share_num': news.get('share_num', 0),
'collect_num': news.get('collect_num', 0),
'comment_num': news.get('comment_num', 0),
}
articles.append(article)
time.sleep(1)
except:
break
result['articles'] = articles
result['raw_api_count'] = len(api_data)
browser.close()
return result
def load_wechat_article_details():
"""加载公众号文章详细数据"""
file = DATA_DIR / 'wechat_article_details.json'
if file.exists():
try:
return json.loads(file.read_text())
except:
return []
return []
def save_wechat_article_details(details):
"""保存公众号文章详细数据"""
file = DATA_DIR / 'wechat_article_details.json'
file.write_text(json.dumps(details, ensure_ascii=False, indent=2))
def generate_wechat_analysis():
"""生成公众号数据深度分析(两层分析框架)"""
lines = []
now = datetime.now()
lines.append(f"📊 公众号数据深度分析")
lines.append(f" 生成时间:{now.strftime('%Y-%m-%d %H:%M')}")
lines.append("")
# 加载文章数据
articles = load_wechat_article_details()
if not articles:
lines.append("❌ 暂无文章数据,请先执行 article-stats")
return '\n'.join(lines)
# 过滤有效数据(有阅读量的)
valid = [a for a in articles if a.get('read_num', 0) > 0 and a.get('title')]
if not valid:
# 如果没有阅读量数据,用有标题的文章
valid = [a for a in articles if a.get('title')]
if not valid:
lines.append("❌ 无有效文章数据")
return '\n'.join(lines)
lines.append("⚠️ 文章无阅读量数据(订阅号无datacube权限),仅展示文章列表")
lines.append("")
for i, a in enumerate(valid[:20]):
lines.append(f" {i+1}. {a['title'][:35]}")
return '\n'.join(lines)
# 计算衍生指标
for a in valid:
reads = a.get('read_num', 0)
likes = a.get('like_num', 0)
wows = a.get('wow_num', 0)
shares = a.get('share_num', 0)
collects = a.get('collect_num', 0)
comments = a.get('comment_num', 0)
a['engage_rate'] = round((likes + wows + comments) / reads * 100, 2) if reads > 0 else 0
a['collect_rate'] = round(collects / reads * 100, 2) if reads > 0 else 0
a['share_rate'] = round(shares / reads * 100, 2) if reads > 0 else 0
a['comment_rate'] = round(comments / reads * 100, 2) if reads > 0 else 0
a['wow_rate'] = round(wows / reads * 100, 2) if reads > 0 else 0
a['collect_like_ratio'] = round(collects / likes, 2) if likes > 0 else 0
# 汇总指标
total_reads = sum(a.get('read_num', 0) for a in valid)
total_likes = sum(a.get('like_num', 0) for a in valid)
total_wows = sum(a.get('wow_num', 0) for a in valid)
total_shares = sum(a.get('share_num', 0) for a in valid)
total_collects = sum(a.get('collect_num', 0) for a in valid)
total_comments = sum(a.get('comment_num', 0) for a in valid)
lines.append(f"📈 汇总指标({len(valid)}篇文章)")
lines.append(f" 总阅读:{total_reads:,} | 总点赞:{total_likes} | 总在看:{total_wows}")
lines.append(f" 总转发:{total_shares} | 总收藏:{total_collects} | 总评论:{total_comments}")
avg_reads = total_reads // len(valid) if valid else 0
lines.append(f" 篇均阅读:{avg_reads:,}")
lines.append("")
# ====== 第一层:行动帮助分析(收藏/点赞比)======
lines.append(f"🏆 行动帮助排行(藏/赞比 Top5)")
by_collect_ratio = sorted(valid, key=lambda x: x.get('collect_like_ratio', 0), reverse=True)
for i, a in enumerate(by_collect_ratio[:5]):
ratio = a.get('collect_like_ratio', 0)
cr = a.get('collect_rate', 0)
title = a.get('title', '?')[:28]
lines.append(f" {i+1}. {title:<29} | 藏/赞={ratio:.2f} | 藏率{cr:.1f}% | 读{a['read_num']:>5} 赞{a['like_num']:>3} 藏{a['collect_num']:>3}")
lines.append("")
# ====== 第二层:情感共鸣分析(转发率+在看率)======
lines.append(f"🔥 情感共鸣排行(转发率 Top5)")
by_share = sorted(valid, key=lambda x: x.get('share_rate', 0), reverse=True)
for i, a in enumerate(by_share[:5]):
sr = a.get('share_rate', 0)
wr = a.get('wow_rate', 0)
title = a.get('title', '?')[:28]
lines.append(f" {i+1}. {title:<29} | 转率{sr:.1f}% | 在看率{wr:.1f}% | 读{a['read_num']:>5} 转{a['share_num']:>2} 在看{a['wow_num']:>2}")
lines.append("")
# ====== 综合判断 ======
lines.append(f"💡 爆款信号")
best_help = by_collect_ratio[0] if by_collect_ratio else None
if best_help and best_help.get('collect_like_ratio', 0) > 1:
lines.append(f" 帮到人的内容:{best_help.get('title', '?')[:25]}(藏/赞={best_help.get('collect_like_ratio', 0):.2f})")
lines.append(f" → 读者不只是认同,是要留着用——说明给了切实的行动帮助")
else:
lines.append(f" 当前没有藏>赞的文章,内容偏情绪共鸣,需要增加行动指引")
best_share = by_share[0] if by_share else None
if best_share and best_share.get('share_rate', 0) > 1:
lines.append(f" 引发传播的内容:{best_share.get('title', '?')[:25]}(转率={best_share.get('share_rate', 0):.1f}%)")
lines.append(f" → 读者觉得别人也需要看——说明触发了强共鸣")
lines.append("")
# ====== 全部文章数据表 ======
lines.append(f"📋 全部文章数据(按阅读量排序)")
by_reads = sorted(valid, key=lambda x: x.get('read_num', 0), reverse=True)
for i, a in enumerate(by_reads):
title = a.get('title', '?')[:24]
lines.append(f" {i+1:>2}. {title:<25} | 读{a['read_num']:>5} 赞{a['like_num']:>3} 在看{a['wow_num']:>2} 藏{a['collect_num']:>3} 转{a['share_num']:>2} 评{a['comment_num']:>2} | 藏/赞={a.get('collect_like_ratio',0):.2f}")
return '\n'.join(lines)
# ============ 数据存储 ============
def load_history(platform):
"""加载历史数据"""
history_file = DATA_DIR / f'{platform}_history.json'
if history_file.exists():
try:
return json.loads(history_file.read_text())
except:
return []
return []
def save_history(platform, history):
"""保存历史数据"""
history_file = DATA_DIR / f'{platform}_history.json'
history_file.write_text(json.dumps(history, ensure_ascii=False, indent=2))
def load_note_details():
"""加载笔记详细数据"""
file = DATA_DIR / 'xhs_note_details.json'
if file.exists():
try:
return json.loads(file.read_text())
except:
return []
return []
def save_note_details(details):
"""保存笔记详细数据"""
file = DATA_DIR / 'xhs_note_details.json'
file.write_text(json.dumps(details, ensure_ascii=False, indent=2))
# ============ 分析报告 ============
def generate_report(platform='all'):
"""生成数据分析报告"""
platforms = ['xhs', 'douyin', 'wechat'] if platform == 'all' else [platform]
report_lines = []
now = datetime.now()
report_lines.append(f"📊 自媒体账号数据报告")
report_lines.append(f" 生成时间:{now.strftime('%Y-%m-%d %H:%M')}")
report_lines.append("")
for plat in platforms:
if plat == 'wechat':
# 公众号用独立的分析报告
report_lines.append(f"{'='*50}")
report_lines.append(f"📱 公众号")
report_lines.append(f"{'='*50}")
wechat_analysis = generate_wechat_analysis()
report_lines.append(wechat_analysis)
report_lines.append("")
continue
plat_name = '小红书' if plat == 'xhs' else '抖音'
history = load_history(plat)
report_lines.append(f"{'='*50}")
report_lines.append(f"📱 {plat_name}")
report_lines.append(f"{'='*50}")
if not history:
report_lines.append(" 暂无数据,请先执行 fetch 抓取")
report_lines.append("")
continue
latest = history[-1]
user = latest.get('user_info', {})
if not user.get('nickname'):
report_lines.append(" ⚠️ Cookie可能已过期,未获取到用户信息")
report_lines.append("")
continue
# 基本信息
report_lines.append(f"👤 账号:{user.get('nickname', '')}")
if plat == 'xhs':
report_lines.append(f" 红书号:{user.get('red_id', '')}")
report_lines.append(f" 粉丝:{user.get('fans_count', 0):,}")
report_lines.append(f" 关注:{user.get('follow_count', 0):,}")
report_lines.append(f" 获赞:{user.get('faved_count', 0):,}")
else:
report_lines.append(f" 粉丝:{user.get('follower_count', 0):,}")
report_lines.append(f" 关注:{user.get('following_count', 0):,}")
report_lines.append(f" 获赞:{user.get('favoriting_count', 0):,}")
report_lines.append(f" 作品:{user.get('aweme_count', 0):,}")
report_lines.append("")
# 粉丝趋势
if plat == 'xhs':
fans = latest.get('fans_overview', {})
if fans:
report_lines.append(f"📈 粉丝趋势")
report_lines.append(f" 当前粉丝:{fans.get('total_fans', 0):,}")
report_lines.append(f" 7日新增:+{fans.get('7day_new_fans', 0)} / 流失:-{fans.get('7day_lost_fans', 0)}")
report_lines.append(f" 30日新增:+{fans.get('30day_new_fans', 0)} / 流失:-{fans.get('30day_lost_fans', 0)}")
trend = fans.get('7day_trend', [])
if trend:
report_lines.append(f" 近7日每日涨粉:")
for item in trend[-7:]:
ts = item.get('date', 0) / 1000
date_str = datetime.fromtimestamp(ts).strftime('%m/%d')
count = item.get('count', 0)
bar = '█' * min(count, 20)
report_lines.append(f" {date_str} +{count} {bar}")
report_lines.append("")
# 粉丝画像
portrait = latest.get('fans_portrait', {})
if portrait:
report_lines.append(f"👥 粉丝画像")
gender = portrait.get('gender', {})
if gender:
total = sum(gender.values()) or 1
parts = [f"{k} {v}({v*100//total}%)" for k, v in gender.items()]
report_lines.append(f" 性别:{' / '.join(parts)}")
age = portrait.get('age', {})
if age:
parts = [f"{k}:{v}" for k, v in sorted(age.items(), key=lambda x: -x[1])[:5]]
report_lines.append(f" 年龄:{' / '.join(parts)}")
city = portrait.get('city', {})
if city:
parts = [f"{k}:{v}" for k, v in sorted(city.items(), key=lambda x: -x[1])[:8]]
report_lines.append(f" 城市:{' / '.join(parts)}")
interest = portrait.get('interest', {})
if interest:
parts = [f"{k}:{v}" for k, v in sorted(interest.items(), key=lambda x: -x[1])[:5]]
report_lines.append(f" 兴趣:{' / '.join(parts)}")
report_lines.append("")
# 笔记数据
note = latest.get('note_overview', {})
if note:
seven = note.get('7day', {})
thirty = note.get('30day', {})
report_lines.append(f"📝 笔记数据")
report_lines.append(f" 7日:阅读{seven.get('views',0):,} 赞{seven.get('likes',0)} 藏{seven.get('collects',0)} 评{seven.get('comments',0)} 转{seven.get('shares',0)}")
report_lines.append(f" 30日:阅读{thirty.get('views',0):,} 赞{thirty.get('likes',0)} 藏{thirty.get('collects',0)} 评{thirty.get('comments',0)} 转{thirty.get('shares',0)}")
report_lines.append("")
# 历史对比
if len(history) >= 2:
prev = history[-2]
prev_user = prev.get('user_info', {})
fans_key = 'fans_count' if plat == 'xhs' else 'follower_count'
curr_fans = user.get(fans_key, 0)
prev_fans = prev_user.get(fans_key, 0)
if prev_fans > 0:
diff = curr_fans - prev_fans
prev_time = prev.get('fetch_time', '')[:10]
report_lines.append(f"📊 对比上次({prev_time})")
report_lines.append(f" 粉丝变化:{prev_fans:,} → {curr_fans:,} ({'+' if diff >= 0 else ''}{diff})")
report_lines.append("")
return '\n'.join(report_lines)
def generate_analysis():
"""生成完整分析报告(汇总+单篇+两层分析)"""
lines = []
now = datetime.now()
lines.append(f"📊 小红书数据深度分析")
lines.append(f" 生成时间:{now.strftime('%Y-%m-%d %H:%M')}")
lines.append("")
# 加载汇总数据
history = load_history('xhs')
if not history:
lines.append("❌ 暂无汇总数据,请先执行 fetch")
return '\n'.join(lines)
latest = history[-1]
user = latest.get('user_info', {})
if not user.get('nickname'):
lines.append("❌ Cookie可能已过期")
return '\n'.join(lines)
# 基本指标
fans = latest.get('fans_overview', {})
note = latest.get('note_overview', {})
seven = note.get('7day', {})
lines.append(f"📈 7日核心指标")
lines.append(f" 粉丝:{user.get('fans_count', 0):,}")
lines.append(f" 阅读:{seven.get('views',0):,} | 赞:{seven.get('likes',0)} | 藏:{seven.get('collects',0)} | 评:{seven.get('comments',0)} | 转:{seven.get('shares',0)}")
if fans:
lines.append(f" 涨粉:+{fans.get('7day_new_fans', 0)}")
lines.append("")
# 加载单篇笔记数据
note_details = load_note_details()
if not note_details:
lines.append("⚠️ 暂无单篇笔记数据,请先执行 note-details")
return '\n'.join(lines)
# 过滤有效数据
valid_notes = [n for n in note_details if 'engage_rate' in n and n.get('views', 0) > 0]
if not valid_notes:
lines.append("⚠️ 无有效笔记数据")
return '\n'.join(lines)
# ====== 第一层:行动帮助分析 ======
lines.append(f"🏆 行动帮助排行(藏/赞比 Top5)")
by_collect_ratio = sorted(valid_notes, key=lambda x: x.get('collect_like_ratio', 0), reverse=True)
for i, n in enumerate(by_collect_ratio[:5]):
ratio = n.get('collect_like_ratio', 0)
cr = n.get('collect_rate', 0)
title = n.get('title', '?')[:28]
lines.append(f" {i+1}. {title:<29} | 藏/赞={ratio:.2f} | 藏率{cr:.1f}% | 看{n['views']:>5} 赞{n['likes']:>3} 藏{n['collects']:>3}")
lines.append("")
# ====== 第二层:情感共鸣分析 ======
lines.append(f"🔥 情感共鸣排行(转发率 Top5)")
by_share = sorted(valid_notes, key=lambda x: x.get('share_rate', 0), reverse=True)
for i, n in enumerate(by_share[:5]):
sr = n.get('share_rate', 0)
cmr = n.get('comment_rate', 0)
title = n.get('title', '?')[:28]
lines.append(f" {i+1}. {title:<29} | 转率{sr:.1f}% | 评率{cmr:.1f}% | 看{n['views']:>5} 转{n['shares']:>2} 评{n['comments']:>2}")
lines.append("")
# ====== 综合判断 ======
lines.append(f"💡 爆款信号")
# 帮到人的内容特征
best_help = by_collect_ratio[0] if by_collect_ratio else None
if best_help and best_help.get('collect_like_ratio', 0) > 1:
lines.append(f" 帮到人的内容:{best_help.get('title', '?')[:25]}(藏/赞={best_help.get('collect_like_ratio', 0):.2f})")
lines.append(f" → 读者不只是认同,是要留着用——说明给了切实的行动帮助")
else:
lines.append(f" 当前没有藏>赞的笔记,内容偏情绪共鸣,需要增加行动指引")
# 引发共鸣的内容特征
best_share = by_share[0] if by_share else None
if best_share and best_share.get('share_rate', 0) > 1:
lines.append(f" 引发传播的内容:{best_share.get('title', '?')[:25]}(转率={best_share.get('share_rate', 0):.1f}%)")
lines.append(f" → 读者觉得别人也需要看——说明触发了强共鸣")
lines.append("")
# ====== 全部笔记数据表 ======
lines.append(f"📋 全部笔记数据(按藏/赞比排序)")
for i, n in enumerate(by_collect_ratio):
title = n.get('title', '?')[:24]
lines.append(f" {i+1:>2}. {title:<25} | 看{n['views']:>5} 赞{n['likes']:>3} 藏{n['collects']:>3} 评{n['comments']:>2} 转{n['shares']:>2} | 藏/赞={n.get('collect_like_ratio',0):.2f} 藏率{n.get('collect_rate',0):.1f}% 转率{n.get('share_rate',0):.1f}%")
return '\n'.join(lines)
# ============ Cookie 检查 ============
def check_cookie(platform):
"""检查Cookie是否有效"""
if platform == 'xhs':
print("🔍 小红书:正在验证Cookie...")
data = fetch_xhs_data()
if data and data.get('user_info', {}).get('nickname'):
user = data['user_info']
print(f"✅ Cookie有效!")
print(f" 账号:{user['nickname']}")
print(f" 粉丝:{user.get('fans_count', 0):,}")
print(f" 获赞:{user.get('faved_count', 0):,}")
else:
print("❌ Cookie已过期或无效")
elif platform == 'douyin':
print("🔍 抖音:正在验证Cookie...")
data = fetch_douyin_data()
if data and data.get('user_info', {}).get('nickname'):
user = data['user_info']
print(f"✅ Cookie有效!")
print(f" 账号:{user['nickname']}")
print(f" 粉丝:{user.get('follower_count', 0):,}")
else:
print("❌ Cookie已过期或无效")
elif platform == 'wechat':
print("🔍 公众号:正在验证Cookie...")
data = fetch_wechat_article_stats()
if data and not data.get('_cookie_expired'):
user = data.get('user_info', {})
articles = data.get('articles', [])
print(f"✅ Cookie有效!")
if user.get('nickname'):
print(f" 账号:{user['nickname']}")
print(f" 获取到 {len(articles)} 篇文章")
else:
print("❌ Cookie已过期或无效")
elif platform == 'all':
check_cookie('xhs')
print()
check_cookie('douyin')
print()
check_cookie('wechat')
# ============ 主入口 ============
def main():
parser = argparse.ArgumentParser(description='自媒体账号数据监测工具')
parser.add_argument('--platform', '-p', type=str, default='all',
choices=['xhs', 'douyin', 'wechat', 'all'],
help='平台:xhs=小红书, douyin=抖音, wechat=公众号, all=全部')
parser.add_argument('--action', '-a', type=str, default='fetch-and-report',
choices=['fetch', 'report', 'fetch-and-report', 'check-cookie',
'note-details', 'analyze'],
help='操作类型')
parser.add_argument('--cookie', type=str, help='直接传入Cookie(优先于配置文件)')
args = parser.parse_args()
if args.action == 'check-cookie':
check_cookie(args.platform)
return
if args.action == 'note-details':
if args.platform == 'wechat':
# 公众号文章数据抓取
print("🔄 正在抓取公众号文章数据...")
data = fetch_wechat_article_stats(cookie=args.cookie)
if data and not data.get('_cookie_expired'):
articles = data.get('articles', [])
if articles:
save_wechat_article_details(articles)
print(f"✅ 获取到 {len(articles)} 篇文章")
for a in articles[:20]:
title = a.get('title', '?')[:28]
reads = a.get('read_num', 0)
likes = a.get('like_num', 0)
print(f" {title:<29} | 读{reads:>5} 赞{likes:>3}")
else:
print("⚠️ 未获取到文章数据")
else:
print("❌ Cookie已过期或无效")
return
print("📋 正在获取笔记列表...")
cookie = args.cookie or load_cookie('xhs')
notes = fetch_xhs_note_list(cookie=cookie)
if not notes:
print("❌ 未获取到笔记列表")
return
print(f" 获取到 {len(notes)} 篇笔记")
note_ids = [n['noteId'] for n in notes if n.get('noteId')]
print(f"🔄 正在获取 {len(note_ids)} 篇笔记的详细数据...")
details = fetch_xhs_note_details(note_ids, cookie=cookie)
# 合并标题
note_map = {n['noteId']: n.get('title', '') for n in notes}
for d in details:
if not d.get('title') and d.get('note_id') in note_map:
d['title'] = note_map[d['note_id']]
save_note_details(details)
valid = [d for d in details if 'engage_rate' in d]
print(f"\n✅ 获取完成,{len(valid)} 篇有数据")
for d in sorted(valid, key=lambda x: x.get('collect_like_ratio', 0), reverse=True):
title = d.get('title', '?')[:28]
print(f" {title:<29} | 看{d['views']:>5} 赞{d['likes']:>3} 藏{d['collects']:>3} 评{d['comments']:>2} 转{d['shares']:>2} | 藏/赞={d.get('collect_like_ratio',0):.2f}")
return
if args.action == 'analyze':
if args.platform == 'wechat':
# 公众号完整分析:抓取文章数据 + 生成分析
print("🔄 正在抓取公众号文章数据...")
data = fetch_wechat_article_stats(cookie=args.cookie)
if data and not data.get('_cookie_expired'):
articles = data.get('articles', [])
if articles:
save_wechat_article_details(articles)
print(f"✅ 获取到 {len(articles)} 篇文章")
else:
print("⚠️ 未获取到文章数据,尝试使用已有数据生成分析")
else:
print("⚠️ Cookie无效,尝试使用已有数据生成分析")
report = generate_wechat_analysis()
print("\n" + report)
report_file = DATA_DIR / f"wechat_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(report_file, 'w') as f:
f.write(report)
print(f"\n📄 分析报告已保存到 {report_file}")
return
# 小红书完整分析:汇总 + 单篇 + 两层分析
print("🔄 正在抓取汇总数据...")
data = fetch_xhs_data(cookie=args.cookie)
if data:
history = load_history('xhs')
history.append(data)
cutoff = (datetime.now() - timedelta(days=30)).isoformat()
history = [h for h in history if h.get('fetch_time', '') > cutoff]
save_history('xhs', history)
user = data.get('user_info', {})
print(f"✅ 汇总数据:{user.get('nickname', '?')} 粉丝{user.get('fans_count', 0):,}")
print("\n📋 正在获取笔记详细数据...")
cookie = args.cookie or load_cookie('xhs')
notes = fetch_xhs_note_list(cookie=cookie)
if notes:
note_ids = [n['noteId'] for n in notes if n.get('noteId')]
details = fetch_xhs_note_details(note_ids, cookie=cookie)
note_map = {n['noteId']: n.get('title', '') for n in notes}
for d in details:
if not d.get('title') and d.get('note_id') in note_map:
d['title'] = note_map[d['note_id']]
save_note_details(details)
report = generate_analysis()
print("\n" + report)
report_file = DATA_DIR / f"analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(report_file, 'w') as f:
f.write(report)
print(f"\n📄 分析报告已保存到 {report_file}")
return
platforms = ['xhs', 'douyin', 'wechat'] if args.platform == 'all' else [args.platform]
if args.action in ['fetch', 'fetch-and-report']:
for plat in platforms:
if plat == 'wechat':
plat_name = '公众号'
print(f"\n🔄 正在抓取{plat_name}文章数据...")
data = fetch_wechat_article_stats(cookie=args.cookie)
if data and not data.get('_cookie_expired'):
articles = data.get('articles', [])
if articles:
save_wechat_article_details(articles)
user = data.get('user_info', {})
print(f"✅ {plat_name}数据抓取完成")
if user.get('nickname'):
print(f" 账号:{user['nickname']}")
print(f" 获取到 {len(articles)} 篇文章")
else:
print(f"⚠️ {plat_name}未获取到文章数据")
else:
print(f"❌ {plat_name}Cookie已过期或无效")
continue
plat_name = '小红书' if plat == 'xhs' else '抖音'
print(f"\n🔄 正在抓取{plat_name}数据...")
if plat == 'xhs':
data = fetch_xhs_data(cookie=args.cookie)
else:
data = fetch_douyin_data(cookie=args.cookie)
if data:
history = load_history(plat)
history.append(data)
cutoff = (datetime.now() - timedelta(days=30)).isoformat()
history = [h for h in history if h.get('fetch_time', '') > cutoff]
save_history(plat, history)
user = data.get('user_info', {})
if user.get('nickname'):
print(f"✅ {plat_name}数据抓取完成")
print(f" 账号:{user['nickname']}")
fans_key = 'fans_count' if plat == 'xhs' else 'follower_count'
print(f" 粉丝:{user.get(fans_key, 0):,}")
else:
print(f"⚠️ {plat_name}未获取到用户信息,Cookie可能已过期")
else:
print(f"❌ {plat_name}数据抓取失败")
if args.action in ['report', 'fetch-and-report']:
report = generate_report(platform=args.platform)
print("\n" + report)
report_file = DATA_DIR / f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
with open(report_file, 'w') as f:
f.write(report)
print(f"\n📄 报告已保存到 {report_file}")
if __name__ == '__main__':
main()