文件内容
scripts/web/app.py
#!/usr/bin/env python3
"""
A股分析 v3.0 — Web Dashboard
Flask + Chart.js 实时行情大屏
"""
import sys
import os
import json
import logging
from datetime import datetime
SCRIPT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, SCRIPT_DIR)
from flask import Flask, render_template, jsonify, request
from config import VERSION, WEB_PORT
from holiday import is_trading_day, format_trading_status, market_is_open, get_market_phase
from data_pipeline import get_cache, get_rate_limiter
logger = logging.getLogger("astock.web")
def create_app():
"""创建 Flask 应用"""
template_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates")
app = Flask(__name__, template_folder=template_dir)
app.config["JSON_AS_ASCII"] = False
@app.route("/")
def index():
"""主页"""
return render_template("dashboard.html", version=VERSION)
@app.route("/api/data")
def api_data():
"""全量数据 API"""
try:
from engine import collect_all_data
class Args:
no_weibo = False
no_market = False
platforms = None
proxy = None
watchlist = None
data = collect_all_data(Args())
# Remove non-serializable fields
data.pop("_cache_stats", None)
return jsonify({"ok": True, "data": data})
except Exception as e:
logger.error(f"Data fetch error: {e}")
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/stock/<code>")
def api_stock(code):
"""个股查询 API"""
try:
from engine import fetch_single_stock
from analysis.capital import CapitalAnalyzer
info = fetch_single_stock(code)
if not info:
return jsonify({"ok": False, "error": f"Stock {code} not found"}), 404
cap = CapitalAnalyzer()
fund = cap.fetch_single_stock_fund_flow(code)
info["fund_flow"] = fund
return jsonify({"ok": True, "data": info})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/hotsearch")
def api_hotsearch():
"""热搜数据 API"""
try:
from sources.multi_platform import fetch_all_hot_sources, filter_stock_keywords, merge_multi_platform_results
from analysis.hotsearch import HotSearchAnalyzer
all_hot = fetch_all_hot_sources()
merged = merge_multi_platform_results(all_hot)
stock_hot = filter_stock_keywords(merged)
analyzer = HotSearchAnalyzer()
result = analyzer.batch_sentiment_analysis(stock_hot)
return jsonify({
"ok": True,
"data": {
"total": len(stock_hot),
"positive": len(result["positive"]),
"negative": len(result["negative"]),
"neutral": len(result["neutral"]),
"items": stock_hot[:30],
}
})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/capital")
def api_capital():
"""资金流向 API"""
try:
from analysis.capital import CapitalAnalyzer
cap = CapitalAnalyzer()
data = {
"northbound": cap.fetch_northbound_flow(),
"northbound_top10": cap.fetch_northbound_top10(),
"northbound_industry": cap.fetch_northbound_industry_flow(),
"main_force": cap.fetch_main_force_flow(),
"dragon_tiger": cap.fetch_dragon_tiger(),
}
return jsonify({"ok": True, "data": data})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/market")
def api_market():
"""大盘数据 API"""
try:
from engine import fetch_market_overview, fetch_market_trend, fetch_zt_dt, fetch_hot_sectors
data = {
"overview": fetch_market_overview(),
"trend": fetch_market_trend(),
"zt_dt": fetch_zt_dt(),
"sectors": fetch_hot_sectors(),
}
return jsonify({"ok": True, "data": data})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/sentiment")
def api_sentiment():
"""情绪指标 API"""
try:
from analysis.sentiment import SentimentAnalyzer
sent = SentimentAnalyzer()
data = {
"stats": sent.fetch_market_stats(),
"fear_greed": sent.fetch_fear_greed_index(),
"margin": sent.fetch_margin_trading(),
}
return jsonify({"ok": True, "data": data})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/status")
def api_status():
"""系统状态 API"""
cache = get_cache()
rl = get_rate_limiter()
return jsonify({
"ok": True,
"data": {
"version": VERSION,
"trading_status": format_trading_status(),
"is_trading_day": is_trading_day(),
"market_phase": get_market_phase(),
"market_is_open": market_is_open(),
"cache": cache.stats(),
"rate_limiter": rl.stats(),
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}
})
@app.route("/api/watchlist", methods=["GET"])
def api_watchlist_get():
"""自选股列表"""
try:
from analysis.watchlist import WatchlistManager
wm = WatchlistManager()
return jsonify({"ok": True, "data": wm.list()})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/watchlist", methods=["POST"])
def api_watchlist_add():
"""添加自选股"""
try:
from analysis.watchlist import WatchlistManager
data = request.get_json()
wm = WatchlistManager()
wm.add(data.get("code", ""), data.get("name", ""), data.get("group", "默认"))
return jsonify({"ok": True})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
@app.route("/api/history")
def api_history():
"""历史快照"""
try:
from utils.common import load_recent_snapshots
days = request.args.get("days", 7, type=int)
snapshots = load_recent_snapshots(days)
return jsonify({"ok": True, "data": snapshots})
except Exception as e:
return jsonify({"ok": False, "error": str(e)}), 500
return app
if __name__ == "__main__":
app = create_app()
app.run(host="0.0.0.0", port=WEB_PORT, debug=True)