文件预览

app.py

查看 Cny Rmb China A Shares Stock 技能包中的文件内容。

文件内容

scripts/web/app.py

#!/usr/bin/env python3
"""
A股分析 v3.0 — Web Dashboard
Flask + Chart.js 实时行情大屏
"""

import sys
import os
import json
import logging
from datetime import datetime

SCRIPT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, SCRIPT_DIR)

from flask import Flask, render_template, jsonify, request
from config import VERSION, WEB_PORT
from holiday import is_trading_day, format_trading_status, market_is_open, get_market_phase
from data_pipeline import get_cache, get_rate_limiter

logger = logging.getLogger("astock.web")


def create_app():
    """创建 Flask 应用"""
    template_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates")
    app = Flask(__name__, template_folder=template_dir)
    app.config["JSON_AS_ASCII"] = False

    @app.route("/")
    def index():
        """主页"""
        return render_template("dashboard.html", version=VERSION)

    @app.route("/api/data")
    def api_data():
        """全量数据 API"""
        try:
            from engine import collect_all_data

            class Args:
                no_weibo = False
                no_market = False
                platforms = None
                proxy = None
                watchlist = None

            data = collect_all_data(Args())
            # Remove non-serializable fields
            data.pop("_cache_stats", None)
            return jsonify({"ok": True, "data": data})
        except Exception as e:
            logger.error(f"Data fetch error: {e}")
            return jsonify({"ok": False, "error": str(e)}), 500

    @app.route("/api/stock/<code>")
    def api_stock(code):
        """个股查询 API"""
        try:
            from engine import fetch_single_stock
            from analysis.capital import CapitalAnalyzer

            info = fetch_single_stock(code)
            if not info:
                return jsonify({"ok": False, "error": f"Stock {code} not found"}), 404

            cap = CapitalAnalyzer()
            fund = cap.fetch_single_stock_fund_flow(code)
            info["fund_flow"] = fund
            return jsonify({"ok": True, "data": info})
        except Exception as e:
            return jsonify({"ok": False, "error": str(e)}), 500

    @app.route("/api/hotsearch")
    def api_hotsearch():
        """热搜数据 API"""
        try:
            from sources.multi_platform import fetch_all_hot_sources, filter_stock_keywords, merge_multi_platform_results
            from analysis.hotsearch import HotSearchAnalyzer

            all_hot = fetch_all_hot_sources()
            merged = merge_multi_platform_results(all_hot)
            stock_hot = filter_stock_keywords(merged)

            analyzer = HotSearchAnalyzer()
            result = analyzer.batch_sentiment_analysis(stock_hot)

            return jsonify({
                "ok": True,
                "data": {
                    "total": len(stock_hot),
                    "positive": len(result["positive"]),
                    "negative": len(result["negative"]),
                    "neutral": len(result["neutral"]),
                    "items": stock_hot[:30],
                }
            })
        except Exception as e:
            return jsonify({"ok": False, "error": str(e)}), 500

    @app.route("/api/capital")
    def api_capital():
        """资金流向 API"""
        try:
            from analysis.capital import CapitalAnalyzer
            cap = CapitalAnalyzer()
            data = {
                "northbound": cap.fetch_northbound_flow(),
                "northbound_top10": cap.fetch_northbound_top10(),
                "northbound_industry": cap.fetch_northbound_industry_flow(),
                "main_force": cap.fetch_main_force_flow(),
                "dragon_tiger": cap.fetch_dragon_tiger(),
            }
            return jsonify({"ok": True, "data": data})
        except Exception as e:
            return jsonify({"ok": False, "error": str(e)}), 500

    @app.route("/api/market")
    def api_market():
        """大盘数据 API"""
        try:
            from engine import fetch_market_overview, fetch_market_trend, fetch_zt_dt, fetch_hot_sectors
            data = {
                "overview": fetch_market_overview(),
                "trend": fetch_market_trend(),
                "zt_dt": fetch_zt_dt(),
                "sectors": fetch_hot_sectors(),
            }
            return jsonify({"ok": True, "data": data})
        except Exception as e:
            return jsonify({"ok": False, "error": str(e)}), 500

    @app.route("/api/sentiment")
    def api_sentiment():
        """情绪指标 API"""
        try:
            from analysis.sentiment import SentimentAnalyzer
            sent = SentimentAnalyzer()
            data = {
                "stats": sent.fetch_market_stats(),
                "fear_greed": sent.fetch_fear_greed_index(),
                "margin": sent.fetch_margin_trading(),
            }
            return jsonify({"ok": True, "data": data})
        except Exception as e:
            return jsonify({"ok": False, "error": str(e)}), 500

    @app.route("/api/status")
    def api_status():
        """系统状态 API"""
        cache = get_cache()
        rl = get_rate_limiter()
        return jsonify({
            "ok": True,
            "data": {
                "version": VERSION,
                "trading_status": format_trading_status(),
                "is_trading_day": is_trading_day(),
                "market_phase": get_market_phase(),
                "market_is_open": market_is_open(),
                "cache": cache.stats(),
                "rate_limiter": rl.stats(),
                "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            }
        })

    @app.route("/api/watchlist", methods=["GET"])
    def api_watchlist_get():
        """自选股列表"""
        try:
            from analysis.watchlist import WatchlistManager
            wm = WatchlistManager()
            return jsonify({"ok": True, "data": wm.list()})
        except Exception as e:
            return jsonify({"ok": False, "error": str(e)}), 500

    @app.route("/api/watchlist", methods=["POST"])
    def api_watchlist_add():
        """添加自选股"""
        try:
            from analysis.watchlist import WatchlistManager
            data = request.get_json()
            wm = WatchlistManager()
            wm.add(data.get("code", ""), data.get("name", ""), data.get("group", "默认"))
            return jsonify({"ok": True})
        except Exception as e:
            return jsonify({"ok": False, "error": str(e)}), 500

    @app.route("/api/history")
    def api_history():
        """历史快照"""
        try:
            from utils.common import load_recent_snapshots
            days = request.args.get("days", 7, type=int)
            snapshots = load_recent_snapshots(days)
            return jsonify({"ok": True, "data": snapshots})
        except Exception as e:
            return jsonify({"ok": False, "error": str(e)}), 500

    return app


if __name__ == "__main__":
    app = create_app()
    app.run(host="0.0.0.0", port=WEB_PORT, debug=True)