文件预览

engine.py

查看 AI Market Research 技能包中的文件内容。

文件内容

engine.py

#!/usr/bin/env python3
"""
ai-market-research 核心编排引擎

职责:
1. 接收任务参数 (JSON via stdin)
2. 调用 crawl4ai 抓取指定 or 自动发现来源
3. 调用 trendradar 获取关联热点
4. 调用 product-research 分析框架
5. 整合结果并生成报告
6. 保存到 agentmemory
"""

import sys
import json
import os
import asyncio
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any

# 配置日志
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('ai-market-research')

# 工作目录
WORKSPACE = Path('/Users/tom/.openclaw/workspace')
OUTPUT_DIR = WORKSPACE / 'output' / 'ai-market-research'
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

class MarketResearchEngine:
    def __init__(self, params: Dict[str, Any]):
        self.params = params
        self.topic = params['topic']
        self.depth = params.get('depth', 'standard')
        self.sources = params.get('sources', [])
        self.output_format = params.get('output_format', 'markdown')
        self.compare_previous = params.get('compare_previous', True)

        self.artifacts = {}
        self.report_data = {}

        logger.info(f"启动研究: topic='{self.topic}', depth={self.depth}")

    async def run(self) -> str:
        """执行完整研究流程,返回报告路径"""
        try:
            # 阶段 1: 数据采集
            await self._collect_data()

            # 阶段 2: 历史对比
            if self.compare_previous:
                await self._compare_with_history()

            # 阶段 3: 分析与整合
            await self._analyze()

            # 阶段 4: 生成报告
            report_path = await self._generate_report()

            # 阶段 5: 记忆存储
            await self._store_memory()

            return report_path
        except Exception as e:
            logger.error(f"执行失败: {e}", exc_info=True)
            raise

    async def _collect_data(self):
        """调用 crawl4ai 和 trendradar 采集数据"""
        logger.info("📡 开始数据采集...")

        # 1. 获取热点舆情
        try:
            # 实际实现应调用 trendradar MCP 工具,这里使用模拟数据确保 standalone 可运行
            trend_data = await self._simulate_trendradar_search()
            self.artifacts['trendradar'] = trend_data
            logger.info(f"✅ TrendRadar: 获取 {len(trend_data.get('news', []))} 条新闻")
        except Exception as e:
            logger.warning(f"TrendRadar 调用失败: {e}")
            self.artifacts['trendradar'] = {'news': [], 'error': str(e)}

        # 2. 深度网页抓取
        try:
            crawl_results = await self._run_crawl4ai()
            self.artifacts['crawl4ai'] = crawl_results
            logger.info(f"✅ Crawl4AI: 抓取 {len(crawl_results)} 个页面")
        except Exception as e:
            logger.warning(f"Crawl4AI 调用失败: {e}")
            self.artifacts['crawl4ai'] = {'pages': [], 'error': str(e)}

    async def _simulate_trendradar_search(self) -> Dict:
        """模拟 trendradar 搜索 (实际需通过 MCP 调用)"""
        # 实际实现应调用 OpenClaw MCP 工具:
        # await call_mcp_tool('trendradar', 'search_news', {'query': self.topic, ...})
        # 返回模拟数据供 standalone 测试使用
        mock_news = [
            {'title': f'{self.topic} 市场爆发式增长', 'platform': 'weibo', 'rank': 1},
            {'title': f'{self.topic} 行业分析报告', 'platform': 'zhihu', 'rank': 2},
            {'title': f'{self.topic} 最新动态:政策支持', 'platform': 'baidu', 'rank': 3},
            {'title': f'{self.topic} 技术突破引关注', 'platform': 'toutiao', 'rank': 4},
            {'title': f'{self.topic} 投资热潮持续', 'platform': 'wallstreetcn-hot', 'rank': 5},
        ]
        return {
            'query': self.topic,
            'timestamp': datetime.now().isoformat(),
            'news': mock_news
        }

    async def _run_crawl4ai(self) -> List[Dict]:
        """执行 crawl4ai 抓取"""
        pages = []
        # 如果未指定 sources,根据 topic 自动发现 (需要 google 搜索 + 筛选)
        urls = self.sources or await self._discover_sources()

        # 限制数量基于 depth
        limits = {'quick': 5, 'standard': 20, 'deep': 50}
        max_urls = limits.get(self.depth, 20)
        urls = urls[:max_urls]

        for url in urls:
            try:
                # 实际调用 crawl4ai MCP 工具
                result = await self._crawl_page(url)
                if result:
                    pages.append(result)
            except Exception as e:
                logger.debug(f"抓取失败 {url}: {e}")
                continue

        return pages

    async def _discover_sources(self) -> List[str]:
        """自动发现相关来源 (简化版)"""
        # 实际实现可结合 Google 搜索 + site: 筛选
        # 这里返回空列表让用户指定 sources
        logger.warning("未指定 sources,自动发现功能待实现")
        return []

    async def _crawl_page(self, url: str) -> Dict:
        """抓取单个页面 (模拟)"""
        # 实际实现应调用 OpenClaw MCP:
        # await call_mcp_tool('crawl4ai', 'crawl', {'url': url, 'extract_mode': 'markdown'})
        # 这里返回模拟数据用于测试
        return {
            'url': url,
            'title': f'{self.topic} - {url.split("//")[1].split("/")[0]}',
            'content': f"{self.topic} 的详细分析内容(来自 {url})...",
            'timestamp': datetime.now().isoformat()
        }

    async def _compare_with_history(self):
        """与历史研究对比"""
        logger.info("📊 历史对比中...")
        try:
            try:
                from agentmemory import memory_search
                results = memory_search(query=self.topic, limit=5, corpus='memory')
                # 确保 historical 是 dict 格式,report_data 期望 {'results': [...]}
                if isinstance(results, dict):
                    self.report_data['historical'] = results
                    count = len(results.get('results', []))
                else:
                    self.report_data['historical'] = {'results': results}
                    count = len(results)
                logger.info(f"✅ 找到 {count} 条历史记录")
            except ImportError:
                logger.warning("agentmemory 未安装,跳过历史对比")
                self.report_data['historical'] = {'results': []}
        except Exception as e:
            logger.warning(f"历史检索失败: {e}")
            self.report_data['historical'] = {'results': []}

    async def _analyze(self):
        """调用 product-research 分析框架生成结论"""
        logger.info("🧠 执行分析...")
        # 融合 crawl4ai 内容 + trendradar 热点 + 历史对比
        # 调用 product-research 技能的分析器
        # 简化: 这里生成一个雏形
        # 从抓取内容中简单提取竞品信息(实际应使用 NLP/LLM 归纳)
        pages = self.artifacts.get('crawl4ai', [])
        competitors = list(set([p.get('title', '').split('-')[0].strip() for p in pages if p.get('title')]))[:5]

        analysis = {
            'topic': self.topic,
            'summary': f'{self.topic} 的市场研究摘要({self.depth} 深度)',
            'trends': self.artifacts.get('trendradar', {}).get('news', [])[:5],
            'competitors': competitors,
            'opportunities': [
                f'{self.topic} 领域热度上升',
                '政策支持力度加大',
                '技术迭代加速带来新机会'
            ],
            'risks': [
                '市场竞争加剧',
                '技术路线不确定性',
                '法规变化风险'
            ],
            'recommendations': [
                '持续监控竞品动态',
                '关注政策导向',
                '建立技术护城河'
            ],
        }
        self.report_data['analysis'] = analysis

    async def _generate_report(self) -> Path:
        """生成最终报告文件"""
        logger.info("📝 生成报告中...")
        now = datetime.now()
        timestamp = now.strftime('%Y-%m-%d %H:%M')
        filename = f"{self.topic.replace(' ', '_')}_{self.depth}_{now.strftime('%Y%m%d_%H%M')}.{self.output_format}"
        report_path = OUTPUT_DIR / filename

        # 渲染模板
        template = self._load_template()
        content = self._render_report(template)

        report_path.write_text(content, encoding='utf-8')
        logger.info(f"✅ 报告已生成: {report_path}")
        return report_path

    def _load_template(self) -> str:
        """加载报告模板"""
        if self.output_format == 'markdown':
            return """# {topic} 市场研究报告

**生成时间**: {timestamp}
**研究深度**: {depth}

## 📰 热点追踪

{trends_section}

## 🔍 深度分析

{analysis_section}

## 📈 历史对比

{historical_section}

## 💡 关键洞察

{insights_section}

---

*报告由 OpenClaw ai-market-research 技能自动生成*
"""
        return "# {topic}\n\n{content}"

    def _render_report(self, template: str) -> str:
        """填充模板"""
        analysis = self.report_data.get('analysis', {})

        trends = analysis.get('trends', [])
        trends_md = "\n".join([f"- **{t['title']}** ({t['platform']} 第{t['rank']}名)" for t in trends[:10]]) if trends else "_暂无热点数据_"

        analysis_md = f"""
### 研究摘要
{analysis.get('summary', 'N/A')}

### 主要机会 ✅
{chr(10).join([f'✅ {o}' for o in analysis.get('opportunities', [])])}

### 关键风险 ⚠️
{chr(10).join([f'⚠️ {r}' for r in analysis.get('risks', [])])}

### 行动建议 💡
{chr(10).join([f'💡 {r}' for r in analysis.get('recommendations', [])])}

### 识别竞品
{chr(10).join(['- ' + c for c in analysis.get('competitors', [])]) if analysis.get('competitors') else '暂无'}
"""

        historical = self.report_data.get('historical', {}).get('results', [])
        if historical:
            hist_md = "\n".join([f"- {h.get('path', '未知')} (时间: {h.get('timestamp', 'N/A')})" for h in historical[:3]])
        else:
            hist_md = "_无历史研究记录_"

        return template.format(
            topic=self.topic,
            timestamp=datetime.now().strftime('%Y-%m-%d %H:%M'),
            depth=self.depth,
            trends_section=trends_md,
            analysis_section=analysis_md,
            historical_section=hist_md,
            insights_section="_AI 洞察待后续集成_"
        )

    async def _store_memory(self):
        """保存关键结论到 agentmemory"""
        logger.info("💾 存储记忆中...")
        try:
            try:
                from agentmemory import memory_save
                memory_save(
                    content=f"市场研究: {self.topic} - {self.report_data.get('analysis', {}).get('summary', 'N/A')}",
                    type='fact',
                    concepts='market-research,' + self.topic,
                    files=str(self.artifacts.keys())
                )
                logger.info("✅ 记忆存储完成")
            except ImportError:
                logger.warning("agentmemory 未安装,跳过记忆存储")
        except Exception as e:
            logger.warning(f"记忆存储失败: {e}")

def main():
    """入口: 从 stdin 读取 JSON 参数"""
    try:
        raw = sys.stdin.read()
        params = json.loads(raw)
    except Exception as e:
        logger.error(f"参数解析失败: {e}")
        sys.exit(1)

    engine = MarketResearchEngine(params)
    try:
        report_path = asyncio.run(engine.run())
        print(f"✅ 研究完成\n📄 报告: {report_path}")
        sys.exit(0)
    except Exception as e:
        logger.error(f"执行失败: {e}", exc_info=True)
        sys.exit(1)

if __name__ == '__main__':
    main()