文件预览

capital.py

查看 Cny Rmb China A Shares Stock 技能包中的文件内容。

文件内容

scripts/analysis/capital.py

#!/usr/bin/env python3
"""
资金分析模块 V2.4
- 北向资金升级:十大成交股、行业流向、持仓变动Top10
- 主力/游资/散户资金拆分
- 龙虎榜集成
- 大单/中单/小单成交占比
"""

import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

try:
    import akshare as ak
    import warnings
    warnings.filterwarnings("ignore")
    HAS_AKSHARE = True
except ImportError:
    HAS_AKSHARE = False

from utils.common import log, format_yi


class CapitalAnalyzer:
    """资金流向分析器"""
    
    def __init__(self):
        self.has_ak = HAS_AKSHARE
    
    # ============================================================
    # 北向资金升级
    # ============================================================
    def fetch_northbound_flow(self):
        """北向资金基础数据(沪股通/深股通净买入)"""
        if not self.has_ak:
            return {}
        result = {"沪股通": {}, "深股通": {}, "合计": {}}
        try:
            df = ak.stock_hsgt_north_net_flow_in_em()
            if df is not None and not df.empty:
                last_row = df.iloc[-1]
                result["date"] = str(last_row.get("date", ""))
                result["north_net"] = round(float(last_row.get("value", 0)), 2)
        except Exception:
            pass
        try:
            df2 = ak.stock_hsgt_fund_flow_summary_em()
            if df2 is not None and not df2.empty:
                last = df2.iloc[-1]
                for prefix, key in [("沪股通", "沪股通"), ("深股通", "深股通")]:
                    try:
                        result[key] = {
                            "buy": round(float(last.get(f"{prefix}-买入", 0)), 2),
                            "sell": round(float(last.get(f"{prefix}-卖出", 0)), 2),
                            "net": round(float(last.get(f"{prefix}-净买入", 0)), 2),
                        }
                    except (ValueError, TypeError):
                        result[key] = {"buy": 0, "sell": 0, "net": 0}
                hn = result["沪股通"].get("net", 0)
                sn = result["深股通"].get("net", 0)
                result["合计"]["net"] = round(hn + sn, 2)
        except Exception:
            pass
        return result
    
    def fetch_northbound_top10(self):
        """北向资金十大成交股"""
        if not self.has_ak:
            return []
        try:
            df = ak.stock_hsgt_north_acc_flow_in_em(symbol="北向资金")
            if df is not None and not df.empty:
                result = []
                for _, row in df.head(10).iterrows():
                    result.append({
                        "code": str(row.get("代码", "")),
                        "name": str(row.get("名称", "")),
                        "net_buy": round(float(row.get("净买入", 0)), 2),
                        "buy": round(float(row.get("买入", 0)), 2),
                        "sell": round(float(row.get("卖出", 0)), 2),
                    })
                return result
        except Exception as e:
            log(f"⚠️  北向十大成交股获取失败: {e}")
        return []
    
    def fetch_northbound_industry_flow(self):
        """北向资金行业流向分布"""
        if not self.has_ak:
            return []
        try:
            df = ak.stock_hsgt_north_industry_flow_em()
            if df is not None and not df.empty:
                result = []
                for _, row in df.head(15).iterrows():
                    result.append({
                        "industry": str(row.get("行业", "")),
                        "net_buy": round(float(row.get("净买入", 0)), 2),
                        "buy": round(float(row.get("买入", 0)), 2),
                        "sell": round(float(row.get("卖出", 0)), 2),
                    })
                return result
        except Exception as e:
            log(f"⚠️  北向行业流向获取失败: {e}")
        return []
    
    def fetch_northbound_holding_change(self):
        """北向资金持仓变动Top10"""
        if not self.has_ak:
            return []
        try:
            df = ak.stock_hsgt_hold_stock_em(market="北向", indicator="今日排行")
            if df is not None and not df.empty:
                result = []
                for _, row in df.head(10).iterrows():
                    result.append({
                        "code": str(row.get("代码", "")),
                        "name": str(row.get("名称", "")),
                        "holding": round(float(row.get("持股数量", 0)), 2),
                        "holding_mv": round(float(row.get("持股市值", 0)), 2),
                        "change": round(float(row.get("持股变动", 0)), 2),
                        "ratio": round(float(row.get("持股占比", 0)), 2),
                    })
                return result
        except Exception as e:
            log(f"⚠️  北向持仓变动获取失败: {e}")
        return []
    
    # ============================================================
    # 主力/游资/散户资金拆分
    # ============================================================
    def fetch_main_force_flow(self):
        """主力/游资/散户资金流向统计"""
        if not self.has_ak:
            return {}
        result = {}
        try:
            # 个股资金流向
            df = ak.stock_individual_fund_flow_rank(indicator="今日")
            if df is not None and not df.empty:
                # 按资金类型分类统计
                total_main = 0
                total_hot = 0
                total_retail = 0
                for _, row in df.iterrows():
                    try:
                        main_net = float(row.get("主力净流入-净额", 0))
                        total_main += main_net
                    except (ValueError, TypeError):
                        pass
                result["主力净流入"] = round(total_main, 2)
                result["个股数量"] = len(df)
        except Exception as e:
            log(f"⚠️  主力资金获取失败: {e}")
        
        # 大单/中单/小单统计
        try:
            df2 = ak.stock_market_fund_flow()
            if df2 is not None and not df2.empty:
                last = df2.iloc[-1]
                result["超大单"] = round(float(last.get("超大单", 0)), 2) if "超大单" in last.index else 0
                result["大单"] = round(float(last.get("大单", 0)), 2) if "大单" in last.index else 0
                result["中单"] = round(float(last.get("中单", 0)), 2) if "中单" in last.index else 0
                result["小单"] = round(float(last.get("小单", 0)), 2) if "小单" in last.index else 0
        except Exception as e:
            log(f"⚠️  大单统计获取失败: {e}")
        
        return result
    
    def fetch_single_stock_fund_flow(self, code):
        """单只股票资金流向详情"""
        if not self.has_ak:
            return {}
        try:
            df = ak.stock_individual_fund_flow(stock=code, market="sh" if code.startswith("6") else "sz")
            if df is not None and not df.empty:
                last = df.iloc[-1]
                return {
                    "主力净流入": round(float(last.get("主力净流入-净额", 0)), 2),
                    "主力净流入占比": round(float(last.get("主力净流入-净占比", 0)), 2),
                    "超大单净流入": round(float(last.get("超大单净流入-净额", 0)), 2),
                    "大单净流入": round(float(last.get("大单净流入-净额", 0)), 2),
                    "中单净流入": round(float(last.get("中单净流入-净额", 0)), 2),
                    "小单净流入": round(float(last.get("小单净流入-净额", 0)), 2),
                }
        except Exception as e:
            log(f"⚠️  个股资金流向获取失败: {e}")
        return {}
    
    # ============================================================
    # 龙虎榜
    # ============================================================
    def fetch_dragon_tiger(self):
        """龙虎榜数据"""
        if not self.has_ak:
            return []
        try:
            from datetime import datetime
            today = datetime.now().strftime("%Y%m%d")
            df = ak.stock_lhb_detail_em(start_date=today, end_date=today)
            if df is not None and not df.empty:
                result = []
                for _, row in df.head(15).iterrows():
                    result.append({
                        "code": str(row.get("代码", "")),
                        "name": str(row.get("名称", "")),
                        "change_pct": round(float(row.get("涨跌幅", 0)), 2),
                        "net_buy": round(float(row.get("龙虎榜净买额", 0)), 2),
                        "buy": round(float(row.get("龙虎榜买入额", 0)), 2),
                        "sell": round(float(row.get("龙虎榜卖出额", 0)), 2),
                        "reason": str(row.get("上榜原因", "")),
                    })
                return result
        except Exception as e:
            log(f"⚠️  龙虎榜获取失败: {e}")
        return []