文件预览

analyze.py

查看 A股分时量能分析 技能包中的文件内容。

文件内容

scripts/analyze.py

#!/usr/bin/env python3
# /// script
# requires-python = ">=3.10"
# dependencies = []
# ///
"""
A股实时行情与分时量能分析工具

数据源:新浪财经(统一接口)
支持:沪市(sh)、深市(sz) 股票

Usage:
    uv run analyze.py 600789              # 单只股票
    uv run analyze.py 600789 002446       # 多只股票
    uv run analyze.py 600789 --minute     # 分时量能分析
    uv run analyze.py 600789 --json       # JSON输出
"""

import argparse
import json
import re
import sys
import urllib.request
from datetime import datetime
from typing import Optional


def get_sina_symbol(code: str) -> str:
    """根据股票代码生成新浪格式代码"""
    code = code.upper().replace("SH", "").replace("SZ", "").replace(".", "")
    
    # 沪市: 6开头
    if code.startswith("6"):
        return "sh" + code
    # 深市: 0/3开头
    elif code.startswith(("0", "3")):
        return "sz" + code
    # 北交所: 8/4开头
    elif code.startswith(("8", "4")):
        return "bj" + code
    else:
        return "sh" + code


def fetch_realtime_sina(symbols: list[str]) -> dict[str, dict]:
    """从新浪获取实时行情(支持批量)
    
    新浪接口返回格式:
    var hq_str_sh600789="名称,今开,昨收,现价,最高,最低,买一,卖一,成交量(股),成交额(元),...";
    
    字段说明:
    0: 名称
    1: 今开
    2: 昨收  
    3: 现价
    4: 最高
    5: 最低
    6: 买一价
    7: 卖一价
    8: 成交量(股)
    9: 成交额(元)
    """
    result = {}
    
    try:
        codes_str = ",".join(symbols)
        url = f"https://hq.sinajs.cn/list={codes_str}"
        
        req = urllib.request.Request(url, headers={
            "Referer": "https://finance.sina.com.cn",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
        })
        resp = urllib.request.urlopen(req, timeout=10)
        text = resp.read().decode("gbk")
        
        # 解析每行
        for line in text.strip().split("\n"):
            line = line.strip()
            if not line:
                continue
            
            # var hq_str_sh600789="数据";
            match = re.match(r'var hq_str_(\w+)="([^"]*)"', line)
            if not match:
                continue
            
            symbol = match.group(1)
            data_str = match.group(2)
            
            if not data_str:
                continue
            
            fields = data_str.split(",")
            if len(fields) < 32:
                continue
            
            name = fields[0]
            open_price = float(fields[1]) if fields[1] else None
            pre_close = float(fields[2]) if fields[2] else None
            price = float(fields[3]) if fields[3] else None
            high = float(fields[4]) if fields[4] else None
            low = float(fields[5]) if fields[5] else None
            volume = int(float(fields[8])) if fields[8] else 0  # 股
            amount = float(fields[9]) if fields[9] else 0  # 元
            
            if not price or price <= 0:
                continue
            
            # 计算涨跌
            change_amt = price - pre_close if pre_close else 0
            change_pct = (change_amt / pre_close * 100) if pre_close and pre_close > 0 else 0
            
            # 换手率需要总股本,这里先留空
            result[symbol] = {
                "code": symbol[2:],  # 去掉sh/sz前缀
                "name": name,
                "price": price,
                "open": open_price,
                "pre_close": pre_close,
                "high": high,
                "low": low,
                "volume": volume // 100,  # 转换为手
                "amount": amount,
                "change_amt": round(change_amt, 2),
                "change_pct": round(change_pct, 2),
                "turnover": None,  # 新浪实时接口不提供换手率
            }
            
    except Exception as e:
        print(f"新浪实时接口错误: {e}", file=sys.stderr)
    
    return result


def fetch_minute_data_sina(symbol: str, count: int = 250) -> list[dict]:
    """从新浪获取分时K线数据
    
    接口: CN_MarketDataService.getKLineData
    返回JSON数组,每条记录包含:
    - day: 时间 (2026-01-27 09:31:00)
    - open/high/low/close: OHLC价格
    - volume: 成交量(股)
    - amount: 成交额(元)
    """
    url = f"https://quotes.sina.cn/cn/api/jsonp_v2.php/var%20_{symbol}=/CN_MarketDataService.getKLineData?symbol={symbol}&scale=1&ma=no&datalen={count}"
    
    try:
        req = urllib.request.Request(url, headers={
            "Referer": "https://finance.sina.com.cn",
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
        })
        resp = urllib.request.urlopen(req, timeout=10)
        text = resp.read().decode("utf-8")
        
        # 解析JSONP: var _xxx=([...])
        match = re.search(r"\(\[(.*)\]\)", text, re.DOTALL)
        if not match:
            return []
        
        data = json.loads("[" + match.group(1) + "]")
        result = []
        for item in data:
            result.append({
                "time": item["day"],
                "open": float(item["open"]),
                "high": float(item["high"]),
                "low": float(item["low"]),
                "close": float(item["close"]),
                "volume": int(item["volume"]),  # 股
                "amount": float(item["amount"]),  # 元
            })
        return result
        
    except Exception as e:
        print(f"新浪分时接口错误: {e}", file=sys.stderr)
    
    return []


def analyze_minute_volume(minute_data: list[dict]) -> dict:
    """分析分时量能"""
    if not minute_data:
        return {"error": "无分时数据"}
    
    # 过滤交易时段数据
    trading_data = [
        d for d in minute_data
        if d["volume"] > 0 and "09:25" <= d["time"][-8:-3] <= "15:00"
    ]
    
    if not trading_data:
        return {"error": "无有效交易数据"}
    
    # 统计各时段成交量
    total_vol = sum(d["volume"] for d in trading_data)
    
    def period_vol(start: str, end: str) -> int:
        return sum(
            d["volume"] for d in trading_data
            if start <= d["time"][-8:-3] < end
        )
    
    open_30 = period_vol("09:30", "10:00")
    mid_am = period_vol("10:00", "11:30")
    mid_pm = period_vol("13:00", "14:30")
    close_30 = period_vol("14:30", "15:01")
    
    # 放量时段 TOP 10
    sorted_by_vol = sorted(trading_data, key=lambda x: x["volume"], reverse=True)[:10]
    top_volumes = [
        {
            "time": d["time"][-8:],
            "price": d["close"],
            "volume": d["volume"] // 100,  # 转换为手
            "amount": d["amount"],
        }
        for d in sorted_by_vol
    ]
    
    # 主力动向判断
    signals = []
    if total_vol > 0:
        if close_30 / total_vol > 0.25:
            signals.append("尾盘大幅放量,可能有主力抢筹或出货")
        elif close_30 / total_vol > 0.15:
            signals.append("尾盘有一定放量")
        if open_30 / total_vol > 0.30:
            signals.append("早盘主力抢筹明显")
        if open_30 / total_vol > 0.40:
            signals.append("早盘放量异常,主力强势介入")
    
    # 检测涨停/跌停
    last_price = trading_data[-1]["close"] if trading_data else 0
    highest_vol_price = sorted_by_vol[0]["close"] if sorted_by_vol else 0
    if last_price > 0 and abs(last_price - highest_vol_price) < 0.01:
        signals.append("封板状态,关注封单量")
    
    return {
        "total_volume": total_vol // 100,  # 手
        "total_amount": sum(d["amount"] for d in trading_data),
        "distribution": {
            "open_30min": {
                "volume": open_30 // 100,
                "percent": round(open_30 / total_vol * 100, 1) if total_vol else 0,
            },
            "mid_am": {
                "volume": mid_am // 100,
                "percent": round(mid_am / total_vol * 100, 1) if total_vol else 0,
            },
            "mid_pm": {
                "volume": mid_pm // 100,
                "percent": round(mid_pm / total_vol * 100, 1) if total_vol else 0,
            },
            "close_30min": {
                "volume": close_30 // 100,
                "percent": round(close_30 / total_vol * 100, 1) if total_vol else 0,
            },
        },
        "top_volumes": top_volumes,
        "signals": signals,
    }


def format_realtime(data: dict) -> str:
    """格式化实时行情输出"""
    change_symbol = "+" if data["change_pct"] >= 0 else ""
    turnover_str = f"换手: {data['turnover']:.2f}%" if data.get("turnover") else ""
    
    lines = [
        f"{'='*60}",
        f"股票: {data['name']} ({data['code']})",
        f"{'='*60}",
        f"",
        f"【实时行情】",
        f"  现价: {data['price']:.2f}  涨跌: {change_symbol}{data['change_pct']:.2f}%",
        f"  今开: {data['open']:.2f}  最高: {data['high']:.2f}  最低: {data['low']:.2f}",
        f"  昨收: {data['pre_close']:.2f}  {turnover_str}",
        f"  成交量: {data['volume']/10000:.1f}万手  成交额: {data['amount']/100000000:.2f}亿",
    ]
    return "\n".join(lines)


def format_minute_analysis(analysis: dict, name: str = "") -> str:
    """格式化分时分析输出"""
    if "error" in analysis:
        return f"分时分析错误: {analysis['error']}"
    
    lines = [
        f"",
        f"【分时量能分析】{name}",
        f"  全天成交: {analysis['total_volume']}手 ({analysis['total_amount']/10000:.1f}万元)",
        f"",
        f"  成交分布:",
        f"    早盘30分(9:30-10:00): {analysis['distribution']['open_30min']['volume']}手 ({analysis['distribution']['open_30min']['percent']}%)",
        f"    上午中段(10:00-11:30): {analysis['distribution']['mid_am']['volume']}手 ({analysis['distribution']['mid_am']['percent']}%)",
        f"    下午中段(13:00-14:30): {analysis['distribution']['mid_pm']['volume']}手 ({analysis['distribution']['mid_pm']['percent']}%)",
        f"    尾盘30分(14:30-15:00): {analysis['distribution']['close_30min']['volume']}手 ({analysis['distribution']['close_30min']['percent']}%)",
        f"",
        f"  放量时段 TOP 10:",
    ]
    
    for item in analysis["top_volumes"]:
        lines.append(f"    {item['time']} 价格:{item['price']:.2f} 成交:{item['volume']}手 金额:{item['amount']/10000:.1f}万")
    
    if analysis["signals"]:
        lines.append(f"")
        lines.append(f"  【主力动向判断】")
        for signal in analysis["signals"]:
            lines.append(f"    🔥 {signal}")
    
    return "\n".join(lines)


def analyze_stock(code: str, with_minute: bool = False, realtime_cache: dict = None) -> dict:
    """分析单只股票"""
    sina_symbol = get_sina_symbol(code)
    
    # 获取实时行情(支持缓存以批量获取)
    if realtime_cache and sina_symbol in realtime_cache:
        realtime = realtime_cache[sina_symbol]
    else:
        realtime_data = fetch_realtime_sina([sina_symbol])
        realtime = realtime_data.get(sina_symbol)
    
    if not realtime:
        return {"error": f"无法获取 {code} 的行情数据"}
    
    result = {
        "code": code,
        "name": realtime["name"],
        "realtime": realtime,
        "updated_at": datetime.now().isoformat(),
    }
    
    # 分时分析
    if with_minute:
        minute_data = fetch_minute_data_sina(sina_symbol)
        minute_analysis = analyze_minute_volume(minute_data)
        result["minute_analysis"] = minute_analysis
    
    return result


def main():
    parser = argparse.ArgumentParser(description="A股实时行情与分时量能分析")
    parser.add_argument("codes", nargs="+", help="股票代码,如 600789 002446")
    parser.add_argument("--minute", "-m", action="store_true", help="包含分时量能分析")
    parser.add_argument("--json", "-j", action="store_true", help="JSON格式输出")
    
    args = parser.parse_args()
    
    # 批量获取实时行情
    sina_symbols = [get_sina_symbol(code) for code in args.codes]
    realtime_cache = fetch_realtime_sina(sina_symbols)
    
    results = []
    for code in args.codes:
        result = analyze_stock(code, with_minute=args.minute, realtime_cache=realtime_cache)
        results.append(result)
    
    if args.json:
        print(json.dumps(results, ensure_ascii=False, indent=2))
    else:
        for result in results:
            if "error" in result:
                print(f"错误: {result['error']}")
                continue
            
            print(format_realtime(result["realtime"]))
            
            if args.minute and "minute_analysis" in result:
                print(format_minute_analysis(result["minute_analysis"], result["name"]))
            
            print()


if __name__ == "__main__":
    main()