文件预览

ga_insights.py

查看 Google Analytics Insights 技能包中的文件内容。

文件内容

ga_insights.py

#!/usr/bin/env python3
"""
GA Insights - AI-Powered Google Analytics Assistant
OpenClaw Skill v0.3.0

Enhanced with:
- Richer natural language understanding
- Bounce rate, page performance, device, geo, hourly/daily trends
- Actionable insights & recommendations
- In-memory caching (5-10 min TTL)
- Better error handling and setup guidance
"""

import os
import sys
import json
import re
import time
from datetime import datetime, timedelta
from pathlib import Path

# ─── Optional GA SDK ──────────────────────────────────────────────────────────
try:
    from google.analytics.data_v1beta import BetaAnalyticsDataClient
    from google.analytics.data_v1beta import (
        RunReportRequest, RunRealtimeReportRequest,
        DateRange, Dimension, Metric, OrderBy, FilterExpression, Filter
    )
    GA_SDK_AVAILABLE = True
except ImportError:
    GA_SDK_AVAILABLE = False

# ─── Paths ────────────────────────────────────────────────────────────────────
CONFIG_DIR  = Path.home() / ".openclaw" / "ga-insights"
CONFIG_FILE = CONFIG_DIR / "config.json"
CACHE_FILE  = CONFIG_DIR / "cache.json"
DEFAULT_CREDENTIALS = str(Path.home() / ".openclaw" / "ga-insights-key.json")
SERVICE_ACCOUNT_EMAIL = "ga-insights@plucky-engine-488015-d4.iam.gserviceaccount.com"

CACHE_TTL   = 300   # 5 minutes default
CACHE_TTL_REALTIME = 30  # 30s for realtime


# ══════════════════════════════════════════════════════════════════════════════
# CACHE
# ══════════════════════════════════════════════════════════════════════════════

def _load_cache() -> dict:
    try:
        if CACHE_FILE.exists():
            with open(CACHE_FILE) as f:
                return json.load(f)
    except Exception:
        pass
    return {}

def _save_cache(cache: dict):
    try:
        CONFIG_DIR.mkdir(parents=True, exist_ok=True)
        with open(CACHE_FILE, 'w') as f:
            json.dump(cache, f)
    except Exception:
        pass

def cache_get(key: str, ttl: int = CACHE_TTL):
    cache = _load_cache()
    entry = cache.get(key)
    if entry and (time.time() - entry["ts"]) < ttl:
        return entry["data"]
    return None

def cache_set(key: str, data):
    cache = _load_cache()
    cache[key] = {"ts": time.time(), "data": data}
    # Keep cache lean – drop entries older than 1h
    now = time.time()
    cache = {k: v for k, v in cache.items() if now - v["ts"] < 3600}
    _save_cache(cache)

def cache_clear():
    if CACHE_FILE.exists():
        CACHE_FILE.unlink()
    return {"message": "Cache cleared."}


# ══════════════════════════════════════════════════════════════════════════════
# CONFIG
# ══════════════════════════════════════════════════════════════════════════════

def load_config() -> dict:
    CONFIG_DIR.mkdir(parents=True, exist_ok=True)
    if CONFIG_FILE.exists():
        with open(CONFIG_FILE) as f:
            return json.load(f)
    return {"property_id": None, "connected": False}

def save_config(config: dict):
    CONFIG_DIR.mkdir(parents=True, exist_ok=True)
    with open(CONFIG_FILE, 'w') as f:
        json.dump(config, f, indent=2)

def get_client():
    """Return initialised GA Data API client or None."""
    if not GA_SDK_AVAILABLE:
        return None
    config = load_config()
    creds = config.get("credentials_path") or DEFAULT_CREDENTIALS
    if os.path.exists(creds):
        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = creds
    try:
        return BetaAnalyticsDataClient()
    except Exception:
        return None


# ══════════════════════════════════════════════════════════════════════════════
# SETUP
# ══════════════════════════════════════════════════════════════════════════════

def setup_guide() -> dict:
    config = load_config()
    if config.get("connected"):
        return {
            "status": "already_connected",
            "property_id": config.get("property_id"),
            "message": f"✅ Already connected to GA4 property {config.get('property_id')}."
        }
    return {
        "status": "setup_needed",
        "service_account_email": SERVICE_ACCOUNT_EMAIL,
        "steps": [
            "1. Go to https://analytics.google.com/",
            "2. Admin (⚙) → Property Access Management",
            "3. Click + → Add users",
            f"4. Enter: {SERVICE_ACCOUNT_EMAIL}",
            "5. Role: Viewer → Save",
            "6. Admin → Property Settings → copy the numeric Property ID",
            "7. Tell me: 'ga connect <Property ID>'"
        ],
        "note": "The service account only has read access — your data stays private."
    }

def complete_setup(property_id: str) -> dict:
    pid = property_id.strip().lstrip("properties/")
    config = load_config()
    config.update({"property_id": pid, "credentials_path": DEFAULT_CREDENTIALS, "connected": True})
    save_config(config)

    try:
        client = get_client()
        if client:
            client.run_report(RunReportRequest(
                property=f"properties/{pid}",
                date_ranges=[DateRange(start_date="today", end_date="today")],
                metrics=[Metric(name="screenPageViews")]
            ))
        return {"status": "success", "property_id": pid,
                "message": f"✅ Connected! Property {pid} is now linked."}
    except Exception as e:
        err = str(e)
        if "not found" in err.lower() or "permission" in err.lower():
            hint = ("Double-check you added the service account email in "
                    "GA4 Admin → Property Access Management.")
        elif "credentials" in err.lower():
            hint = "Service account key file not found. Contact support."
        else:
            hint = "Check your network and try again."
        return {"status": "error", "message": f"Connection failed: {err}", "hint": hint}


# ══════════════════════════════════════════════════════════════════════════════
# LOW-LEVEL QUERY HELPER
# ══════════════════════════════════════════════════════════════════════════════

def _not_connected() -> dict:
    return {
        "error": "Not connected to GA4.",
        "fix": "Run 'ga setup' for instructions, then 'ga connect <Property ID>'."
    }

def _sdk_missing() -> dict:
    return {
        "error": "Google Analytics SDK not installed.",
        "fix": "Run: pip install google-analytics-data"
    }

def _api_error(e: Exception, context: str = "") -> dict:
    err = str(e)
    if "quota" in err.lower():
        msg = "GA4 API quota exceeded. Try again later."
    elif "permission" in err.lower() or "403" in err:
        msg = "Permission denied. Make sure the service account has Viewer access in GA4."
    elif "not found" in err.lower() or "404" in err:
        msg = "Property not found. Check your Property ID in GA4 Admin → Property Settings."
    elif "credentials" in err.lower() or "authentication" in err.lower():
        msg = "Authentication failed. Service account key may be missing or invalid."
    else:
        msg = f"GA4 API error: {err}"
    return {"error": msg, "context": context}

def run_report(metrics: list, dimensions: list = None,
               date_ranges: list = None, days: int = 7,
               order_by: list = None, limit: int = 20,
               cache_key: str = None, cache_ttl: int = CACHE_TTL) -> dict:
    """
    Thin wrapper around RunReportRequest with caching.
    Returns {"rows": [...], "days": int} or {"error": ...}
    """
    if cache_key:
        cached = cache_get(cache_key, cache_ttl)
        if cached is not None:
            return cached

    config = load_config()
    if not config.get("connected"):
        return _not_connected()
    if not GA_SDK_AVAILABLE:
        return _sdk_missing()

    client = get_client()
    if not client:
        return {"error": "Could not initialise GA4 client. Check credentials."}

    pid = config["property_id"]

    if date_ranges is None:
        end = datetime.now()
        start = end - timedelta(days=days - 1)
        date_ranges = [DateRange(
            start_date=start.strftime('%Y-%m-%d'),
            end_date=end.strftime('%Y-%m-%d')
        )]

    kwargs = {
        "property": f"properties/{pid}",
        "date_ranges": date_ranges,
        "metrics": [Metric(name=m) for m in metrics],
        "limit": limit,
    }
    if dimensions:
        kwargs["dimensions"] = [Dimension(name=d) for d in dimensions]
    if order_by:
        kwargs["order_bys"] = order_by

    try:
        response = client.run_report(RunReportRequest(**kwargs))
        result = {"rows": response.rows, "days": days, "raw": response}
        if cache_key:
            cache_set(cache_key, result)
        return result
    except Exception as e:
        return _api_error(e, f"metrics={metrics} dims={dimensions}")


# ══════════════════════════════════════════════════════════════════════════════
# INSIGHT HELPERS
# ══════════════════════════════════════════════════════════════════════════════

def _pct_change(new: float, old: float) -> float:
    if old == 0:
        return 0.0
    return round((new - old) / old * 100, 1)

def _trend_label(pct: float) -> str:
    if pct > 20:   return "📈 significantly up"
    if pct > 5:    return "↑ up"
    if pct < -20:  return "📉 significantly down"
    if pct < -5:   return "↓ down"
    return "→ roughly flat"

def _bounce_insight(rate: float) -> str:
    if rate < 30:
        return "Excellent bounce rate — users are very engaged."
    if rate < 50:
        return "Good bounce rate — solid engagement."
    if rate < 70:
        return "Moderate bounce rate. Consider improving page load speed, content relevance, and clear CTAs."
    return ("High bounce rate ⚠️  — many visitors leave without interacting. "
            "Check page load speed (aim <3s), above-the-fold content, and mobile experience.")

def _session_duration_insight(seconds: float) -> str:
    mins = seconds / 60
    if mins < 1:
        return "Very short sessions — users aren't finding what they need."
    if mins < 2:
        return "Short sessions — content may not be engaging enough."
    if mins < 5:
        return "Decent session length — users are spending some time."
    return "Great session duration — users are deeply engaged."

def _traffic_drop_insight(pct: float, period: str = "vs last week") -> str:
    if pct < -30:
        return (f"⚠️  Traffic is {abs(pct):.0f}% lower {period}. Investigate: "
                "check for crawl errors in Google Search Console, recent deployments, "
                "or marketing campaign pauses.")
    if pct < -10:
        return (f"Traffic is {abs(pct):.0f}% lower {period}. Worth investigating — "
                "check referral sources and any recent site changes.")
    if pct > 30:
        return f"🎉 Traffic is {abs(pct):.0f}% higher {period}! Check which sources drove the spike."
    return f"Traffic is {abs(pct):.0f}% {'higher' if pct > 0 else 'lower'} {period} — within normal range."


# ══════════════════════════════════════════════════════════════════════════════
# QUERY FUNCTIONS
# ══════════════════════════════════════════════════════════════════════════════

def get_traffic_summary(days: int = 7) -> dict:
    """Overall traffic overview with daily trend."""
    result = run_report(
        metrics=["screenPageViews", "totalUsers", "sessions",
                 "averageSessionDuration", "bounceRate"],
        dimensions=["date"],
        days=days,
        cache_key=f"traffic_summary_{days}",
    )
    if "error" in result:
        return result

    rows = result["rows"]
    if not rows:
        return {"message": "No data found for this period.", "days": days}

    total_views    = sum(int(r.metric_values[0].value) for r in rows)
    total_users    = sum(int(r.metric_values[1].value) for r in rows)
    total_sessions = sum(int(r.metric_values[2].value) for r in rows)
    avg_duration   = sum(float(r.metric_values[3].value) for r in rows) / len(rows)
    avg_bounce     = sum(float(r.metric_values[4].value) for r in rows) / len(rows) * 100

    # Daily breakdown (sorted by date)
    daily = sorted([{
        "date": r.dimension_values[0].value,
        "views": int(r.metric_values[0].value),
        "users": int(r.metric_values[1].value),
    } for r in rows], key=lambda x: x["date"])

    # Find best/worst day
    best  = max(daily, key=lambda x: x["views"])
    worst = min(daily, key=lambda x: x["views"])

    return {
        "period": f"Last {days} days",
        "page_views": total_views,
        "users": total_users,
        "sessions": total_sessions,
        "avg_session_duration_sec": round(avg_duration, 0),
        "bounce_rate_pct": round(avg_bounce, 1),
        "daily": daily,
        "best_day": best,
        "worst_day": worst,
        "insights": [
            f"📊 {total_users:,} users · {total_sessions:,} sessions · {total_views:,} page views over {days} days.",
            f"⏱  Average session: {avg_duration/60:.1f} min — {_session_duration_insight(avg_duration)}",
            f"🏆 Best day: {best['date']} ({best['views']:,} views) | "
            f"Slowest: {worst['date']} ({worst['views']:,} views).",
            _bounce_insight(avg_bounce),
        ]
    }

def get_traffic_sources(days: int = 7) -> dict:
    """Traffic breakdown by source/medium."""
    result = run_report(
        metrics=["sessions", "totalUsers", "bounceRate"],
        dimensions=["sessionSource", "sessionMedium"],
        days=days,
        limit=15,
        order_by=[OrderBy(metric=OrderBy.MetricOrderBy(metric_name="sessions"), desc=True)],
        cache_key=f"traffic_sources_{days}",
    )
    if "error" in result:
        return result

    sources = [{
        "source": r.dimension_values[0].value,
        "medium": r.dimension_values[1].value,
        "sessions": int(r.metric_values[0].value),
        "users": int(r.metric_values[1].value),
        "bounce_pct": round(float(r.metric_values[2].value) * 100, 1),
    } for r in result["rows"]]

    total_sessions = sum(s["sessions"] for s in sources)
    for s in sources:
        s["share_pct"] = round(s["sessions"] / total_sessions * 100, 1) if total_sessions else 0

    top = sources[0] if sources else {}
    insights = [
        f"Top source: {top.get('source','?')}/{top.get('medium','?')} "
        f"({top.get('share_pct',0):.0f}% of sessions)." if top else "No source data.",
    ]
    # Flag heavy bounce sources
    high_bounce = [s for s in sources if s["bounce_pct"] > 70 and s["sessions"] > 10]
    if high_bounce:
        insights.append("⚠️  High-bounce sources (>70%): " +
                        ", ".join(f"{s['source']}/{s['medium']}" for s in high_bounce[:3]) +
                        ". Review landing pages for these channels.")

    return {"period": f"Last {days} days", "sources": sources, "insights": insights}

def get_realtime_users() -> dict:
    """Current active users."""
    config = load_config()
    if not config.get("connected"):
        return _not_connected()
    if not GA_SDK_AVAILABLE:
        return _sdk_missing()

    cached = cache_get("realtime", CACHE_TTL_REALTIME)
    if cached:
        return cached

    client = get_client()
    try:
        resp = client.run_realtime_report(RunRealtimeReportRequest(
            property=f"properties/{config['property_id']}",
            dimensions=[Dimension(name="country"), Dimension(name="deviceCategory")],
            metrics=[Metric(name="activeUsers")]
        ))
        breakdown = [{
            "country": r.dimension_values[0].value,
            "device": r.dimension_values[1].value,
            "users": int(r.metric_values[0].value),
        } for r in resp.rows]
        total = sum(b["users"] for b in breakdown)
        result = {
            "active_users": total,
            "breakdown": breakdown[:10],
            "status": "live",
            "insight": (f"🟢 {total} user{'s' if total != 1 else ''} on your site right now." if total
                        else "No active users at this moment.")
        }
        cache_set("realtime", result)
        return result
    except Exception as e:
        return _api_error(e, "realtime")

def get_bounce_rate(days: int = 7) -> dict:
    """Bounce rate by page."""
    result = run_report(
        metrics=["bounceRate", "screenPageViews", "averageSessionDuration"],
        dimensions=["pagePath"],
        days=days,
        limit=20,
        order_by=[OrderBy(metric=OrderBy.MetricOrderBy(metric_name="screenPageViews"), desc=True)],
        cache_key=f"bounce_rate_{days}",
    )
    if "error" in result:
        return result

    pages = [{
        "page": r.dimension_values[0].value,
        "bounce_pct": round(float(r.metric_values[0].value) * 100, 1),
        "views": int(r.metric_values[1].value),
        "avg_duration_sec": round(float(r.metric_values[2].value), 0),
    } for r in result["rows"]]

    high = sorted([p for p in pages if p["bounce_pct"] > 60], key=lambda x: x["bounce_pct"], reverse=True)
    low  = sorted([p for p in pages if p["bounce_pct"] < 40], key=lambda x: x["bounce_pct"])
    overall = sum(p["bounce_pct"] * p["views"] for p in pages) / max(sum(p["views"] for p in pages), 1)

    insights = [_bounce_insight(overall)]
    if high:
        insights.append(f"Pages needing attention: {', '.join(p['page'] for p in high[:3])} — bounce >60%.")
    if low:
        insights.append(f"Best-performing pages: {', '.join(p['page'] for p in low[:3])} — excellent engagement.")

    return {
        "period": f"Last {days} days",
        "overall_bounce_pct": round(overall, 1),
        "pages": pages,
        "high_bounce_pages": high[:5],
        "low_bounce_pages": low[:5],
        "insights": insights,
    }

def get_page_performance(days: int = 7, sort_by: str = "views") -> dict:
    """Best and worst performing pages."""
    order_metric = "screenPageViews" if sort_by == "views" else "averageSessionDuration"
    result = run_report(
        metrics=["screenPageViews", "totalUsers", "averageSessionDuration",
                 "bounceRate", "engagementRate"],
        dimensions=["pagePath", "pageTitle"],
        days=days,
        limit=25,
        order_by=[OrderBy(metric=OrderBy.MetricOrderBy(metric_name=order_metric), desc=True)],
        cache_key=f"page_perf_{days}_{sort_by}",
    )
    if "error" in result:
        return result

    pages = [{
        "path": r.dimension_values[0].value,
        "title": r.dimension_values[1].value,
        "views": int(r.metric_values[0].value),
        "users": int(r.metric_values[1].value),
        "avg_duration_sec": round(float(r.metric_values[2].value), 0),
        "bounce_pct": round(float(r.metric_values[3].value) * 100, 1),
        "engagement_pct": round(float(r.metric_values[4].value) * 100, 1),
    } for r in result["rows"]]

    top5    = pages[:5]
    bottom5 = sorted(pages, key=lambda x: x["views"])[:5]

    return {
        "period": f"Last {days} days",
        "top_pages": top5,
        "bottom_pages": bottom5,
        "total_pages_analysed": len(pages),
        "insights": [
            f"Top page: {top5[0]['path']} — {top5[0]['views']:,} views." if top5 else "",
            (f"Most engaging: {max(pages, key=lambda x: x['engagement_pct'])['path']} "
             f"({max(pages, key=lambda x: x['engagement_pct'])['engagement_pct']:.0f}% engagement rate).")
            if pages else "",
        ]
    }

def get_device_breakdown(days: int = 7) -> dict:
    """Traffic by device category."""
    result = run_report(
        metrics=["sessions", "totalUsers", "bounceRate", "averageSessionDuration"],
        dimensions=["deviceCategory"],
        days=days,
        cache_key=f"devices_{days}",
    )
    if "error" in result:
        return result

    devices = [{
        "device": r.dimension_values[0].value.capitalize(),
        "sessions": int(r.metric_values[0].value),
        "users": int(r.metric_values[1].value),
        "bounce_pct": round(float(r.metric_values[2].value) * 100, 1),
        "avg_duration_sec": round(float(r.metric_values[3].value), 0),
    } for r in result["rows"]]

    total = sum(d["sessions"] for d in devices)
    for d in devices:
        d["share_pct"] = round(d["sessions"] / total * 100, 1) if total else 0

    mobile = next((d for d in devices if "mobile" in d["device"].lower()), None)
    desktop = next((d for d in devices if "desktop" in d["device"].lower()), None)

    insights = []
    if mobile and desktop:
        if mobile["share_pct"] > 60:
            insights.append(f"📱 {mobile['share_pct']:.0f}% mobile — ensure your site is fully mobile-optimised.")
        elif desktop["share_pct"] > 70:
            insights.append(f"🖥  {desktop['share_pct']:.0f}% desktop — mobile still worth optimising for growth.")
        if mobile and mobile["bounce_pct"] > desktop["bounce_pct"] + 15:
            insights.append("Mobile bounce rate is significantly higher than desktop — check mobile UX & load speed.")

    return {"period": f"Last {days} days", "devices": devices, "insights": insights}

def get_geo_data(days: int = 7) -> dict:
    """Traffic by country."""
    result = run_report(
        metrics=["sessions", "totalUsers", "bounceRate"],
        dimensions=["country"],
        days=days,
        limit=15,
        order_by=[OrderBy(metric=OrderBy.MetricOrderBy(metric_name="sessions"), desc=True)],
        cache_key=f"geo_{days}",
    )
    if "error" in result:
        return result

    countries = [{
        "country": r.dimension_values[0].value,
        "sessions": int(r.metric_values[0].value),
        "users": int(r.metric_values[1].value),
        "bounce_pct": round(float(r.metric_values[2].value) * 100, 1),
    } for r in result["rows"]]

    total = sum(c["sessions"] for c in countries)
    for c in countries:
        c["share_pct"] = round(c["sessions"] / total * 100, 1) if total else 0

    top = countries[0] if countries else {}
    return {
        "period": f"Last {days} days",
        "countries": countries,
        "insights": [
            f"🌍 Top country: {top.get('country','?')} ({top.get('share_pct',0):.0f}% of sessions)." if top else "",
            f"Audience spans {len(countries)} countries." if countries else "",
        ]
    }

def get_hourly_trends(days: int = 1) -> dict:
    """Hourly traffic distribution."""
    result = run_report(
        metrics=["sessions", "totalUsers"],
        dimensions=["hour"],
        days=days,
        order_by=[OrderBy(dimension=OrderBy.DimensionOrderBy(dimension_name="hour"))],
        cache_key=f"hourly_{days}",
    )
    if "error" in result:
        return result

    hours = [{
        "hour": int(r.dimension_values[0].value),
        "sessions": int(r.metric_values[0].value),
        "users": int(r.metric_values[1].value),
    } for r in result["rows"]]

    if not hours:
        return {"message": "No hourly data available."}

    peak = max(hours, key=lambda x: x["sessions"])
    low  = min(hours, key=lambda x: x["sessions"])

    return {
        "period": f"Last {days} day(s)",
        "hours": hours,
        "peak_hour": peak,
        "low_hour": low,
        "insights": [
            f"🕐 Peak traffic at {peak['hour']:02d}:00 ({peak['sessions']} sessions).",
            f"Quietest at {low['hour']:02d}:00 ({low['sessions']} sessions) — good time for maintenance.",
        ]
    }

def get_daily_trends(days: int = 30) -> dict:
    """Day-by-day traffic with week-over-week comparison."""
    result = run_report(
        metrics=["sessions", "totalUsers", "screenPageViews"],
        dimensions=["date"],
        days=days,
        order_by=[OrderBy(dimension=OrderBy.DimensionOrderBy(dimension_name="date"))],
        cache_key=f"daily_trends_{days}",
    )
    if "error" in result:
        return result

    daily = [{
        "date": r.dimension_values[0].value,
        "sessions": int(r.metric_values[0].value),
        "users": int(r.metric_values[1].value),
        "views": int(r.metric_values[2].value),
    } for r in result["rows"]]

    if not daily:
        return {"message": "No daily data available."}

    # Week-over-week if enough data
    insights = []
    if len(daily) >= 14:
        this_week = daily[-7:]
        last_week = daily[-14:-7]
        tw_sessions = sum(d["sessions"] for d in this_week)
        lw_sessions = sum(d["sessions"] for d in last_week)
        pct = _pct_change(tw_sessions, lw_sessions)
        insights.append(f"This week vs last week: {_trend_label(pct)} ({pct:+.0f}%). "
                        + _traffic_drop_insight(pct, "vs last week"))

    avg = sum(d["sessions"] for d in daily) / len(daily)
    anomalies = [d for d in daily if d["sessions"] < avg * 0.5]
    if anomalies:
        dates = [d["date"] for d in anomalies]
        insights.append(f"⚠️  Unusually low traffic on: {', '.join(dates)}. Check for site issues or algorithm changes.")

    return {"period": f"Last {days} days", "daily": daily, "insights": insights}

def get_conversions(days: int = 7) -> dict:
    """Conversion events with rate analysis."""
    # Total users for rate calculation
    traffic_result = run_report(
        metrics=["totalUsers"],
        days=days,
        cache_key=f"total_users_{days}",
    )

    result = run_report(
        metrics=["eventCount", "totalUsers", "eventCountPerUser"],
        dimensions=["eventName"],
        days=days,
        limit=20,
        order_by=[OrderBy(metric=OrderBy.MetricOrderBy(metric_name="eventCount"), desc=True)],
        cache_key=f"conversions_{days}",
    )
    if "error" in result:
        return result

    events = [{
        "event": r.dimension_values[0].value,
        "count": int(r.metric_values[0].value),
        "users": int(r.metric_values[1].value),
        "per_user": round(float(r.metric_values[2].value), 2),
    } for r in result["rows"]]

    total_users = 0
    if "rows" in traffic_result and traffic_result["rows"]:
        total_users = int(traffic_result["rows"][0].metric_values[0].value)

    # Likely conversion events (not standard GA noise)
    conversion_keywords = ["purchase", "sign_up", "subscribe", "lead", "submit", "register",
                          "checkout", "add_to_cart", "complete", "goal"]

    likely_conversions = [e for e in events if any(k in e["event"].lower() for k in conversion_keywords)]
    other_events = [e for e in events if not any(k in e["event"].lower() for k in conversion_keywords)]

    insights = []
    if likely_conversions:
        conv = likely_conversions[0]
        rate = (conv["users"] / total_users * 100) if total_users else 0
        insights.append(f"🎯 Top conversion: {conv['event']} — {conv['users']:,} completions "
                        f"({rate:.1f}% conversion rate).")
    else:
        insights.append("No standard conversion events detected. "
                        "Track purchases, signups, or goals via GA4 events.")

    return {
        "period": f"Last {days} days",
        "total_users": total_users,
        "conversion_events": likely_conversions[:10],
        "other_events": other_events[:10],
        "insights": insights
    }

def get_funnel_analysis(days: int = 7) -> dict:
    """Simple conversion funnel analysis."""
    result = run_report(
        metrics=["totalUsers", "sessions", "conversions"],
        dimensions=["date"],
        days=days,
        cache_key=f"funnel_{days}",
    )
    if "error" in result:
        return result

    rows = result["rows"]
    if not rows:
        return {"message": "Not enough data for funnel analysis."}

    total_users = sum(int(r.metric_values[0].value) for r in rows)
    total_sessions = sum(int(r.metric_values[1].value) for r in rows)
    total_conversions = sum(int(r.metric_values[2].value) for r in rows)

    # Calculate rates
    visitor_to_session = (total_sessions / total_users * 100) if total_users else 0
    session_to_conv = (total_conversions / total_sessions * 100) if total_sessions else 0
    overall_conv = (total_conversions / total_users * 100) if total_users else 0

    return {
        "period": f"Last {days} days",
        "visitors": total_users,
        "sessions": total_sessions,
        "conversions": total_conversions,
        "rates": {
            "visitor_to_session_pct": round(visitor_to_session, 1),
            "session_to_conversion_pct": round(session_to_conv, 2),
            "overall_conversion_pct": round(overall_conv, 2),
        },
        "insights": [
            f"📊 {visitor_to_session:.1f}% of visitors start a session.",
            f"🎯 {session_to_conv:.2f}% of sessions convert.",
            f"📈 Overall conversion rate: {overall_conv:.2f}%.",
            "Funnel tip: Focus on the biggest drop-off step for biggest impact."
        ]
    }

def compare_periods(period1_days: int = 7, period2_days: int = 7) -> dict:
    """Compare two time periods."""
    # Get data for period 1 (more recent)
    p1 = run_report(
        metrics=["sessions", "totalUsers", "screenPageViews", "bounceRate"],
        days=period1_days,
        cache_key=f"compare_p1_{period1_days}",
    )
    # Get data for period 2 (older)
    p2 = run_report(
        metrics=["sessions", "totalUsers", "screenPageViews", "bounceRate"],
        days=period2_days,
        cache_key=f"compare_p2_{period2_days}",
    )

    if "error" in p1:
        return p1
    if "error" in p2:
        return p2

    # Extract totals
    def extract_totals(r):
        return {
            "sessions": sum(int(row.metric_values[0].value) for row in r["rows"]),
            "users": sum(int(row.metric_values[1].value) for row in r["rows"]),
            "views": sum(int(row.metric_values[2].value) for row in r["rows"]),
            "bounce": sum(float(row.metric_values[3].value) * int(row.metric_values[0].value) for row in r["rows"]) /
                      max(sum(int(row.metric_values[0].value) for row in r["rows"]), 1) * 100,
        }

    d1 = extract_totals(p1)
    d2 = extract_totals(p2)

    def diff_label(current, previous, metric):
        pct = _pct_change(current, previous)
        return {
            "metric": metric,
            "current": current,
            "previous": previous,
            "change_pct": pct,
            "label": _trend_label(pct),
        }

    return {
        "period_1": f"Last {period1_days} days",
        "period_2": f"Previous {period2_days} days",
        "comparison": {
            "sessions": diff_label(d1["sessions"], d2["sessions"], "sessions"),
            "users": diff_label(d1["users"], d2["users"], "users"),
            "page_views": diff_label(d1["views"], d2["views"], "page views"),
            "bounce_rate": diff_label(d1["bounce"], d2["bounce"], "bounce rate"),
        },
        "summary": (
            f"Traffic {diff_label(d1['sessions'], d2['sessions'], 'sessions')['label']} "
            f"({diff_label(d1['sessions'], d2['sessions'], 'sessions')['change_pct']:+.0f}%) "
            f"compared to the previous period."
        )
    }

def analyze_traffic_drop(days: int = 7) -> dict:
    """Compare recent period with same period earlier."""
    return compare_periods(days, days)


# ══════════════════════════════════════════════════════════════════════════════
# NATURAL LANGUAGE PARSER
# ══════════════════════════════════════════════════════════════════════════════

def _parse_period(text: str) -> int:
    """Extract day count from natural language."""
    text = text.lower()
    numbers = re.findall(r'(\d+)', text)
    if numbers:
        num = int(numbers[0])
        if 'hour' in text:
            return min(num // 24 + 1, 30)
        if 'day' in text:
            return min(num, 365)
        if 'week' in text:
            return min(num * 7, 90)
        if 'month' in text:
            return min(num * 30, 365)
    # Default: 7 days
    return 7

def _extract_intent(text: str) -> tuple:
    """Return (intent, params) tuple."""
    text = text.lower().strip()

    # Realtime
    if any(w in text for w in ['realtime', 'right now', 'current', 'live', 'active', 'online']):
        return ('realtime', {})

    # Bounce rate
    if any(w in text for w in ['bounce', 'bouncing']):
        days = _parse_period(text)
        return ('bounce', {'days': days})

    # Pages / page performance
    if any(w in text for w in ['page', 'pages', 'best page', 'worst page', 'top page']):
        days = _parse_period(text)
        sort = 'duration' if 'engag' in text or 'time' in text else 'views'
        return ('pages', {'days': days, 'sort_by': sort})

    # Devices
    if any(w in text for w in ['device', 'mobile', 'desktop', 'tablet', 'phone']):
        days = _parse_period(text)
        return ('devices', {'days': days})

    # Geography / countries / location
    if any(w in text for w in ['country', 'countries', 'geo', 'location', 'where', 'nation']):
        days = _parse_period(text)
        return ('geo', {'days': days})

    # Hourly
    if any(w in text for w in ['hour', 'hourly', 'time of day', 'when']):
        days = _parse_period(text)
        return ('hourly', {'days': days})

    # Daily trends / over time
    if any(w in text for w in ['trend', 'daily', 'over time', 'history', 'past']):
        days = _parse_period(text)
        return ('daily', {'days': days})

    # Conversions / goals / events
    if any(w in text for w in ['convert', 'conversion', 'goal', 'purchase', 'sign up', 'signup',
                                'event', 'complete', 'submit', 'funnel']):
        days = _parse_period(text)
        if 'funnel' in text:
            return ('funnel', {'days': days})
        return ('conversions', {'days': days})

    # Traffic drop / why / decrease
    if any(w in text for w in ['drop', 'down', 'decrease', 'lower', 'less', 'why', 'dropped']):
        days = _parse_period(text)
        return ('drop', {'days': days})

    # Compare periods
    if any(w in text for w in ['compare', 'vs', 'versus', 'difference', 'change']):
        # Try to extract two periods
        periods = re.findall(r'(\d+)\s*(day|week|month)', text)
        if len(periods) >= 2:
            p1 = periods[0][0]
            p2 = periods[1][0]
            unit = periods[0][1]
            mult = 1 if unit == 'day' else 7 if unit == 'week' else 30
            p2_mult = 1 if periods[1][1] == 'day' else 7 if periods[1][1] == 'week' else 30
            return ('compare', {'period1': int(p1)*mult, 'period2': int(p2)*p2_mult})
        return ('compare', {'period1': 7, 'period2': 7})

    # Sources / traffic source / where from
    if any(w in text for w in ['source', 'sources', 'referral', 'medium', 'channel', 'where from']):
        days = _parse_period(text)
        return ('sources', {'days': days})

    # Default: traffic summary
    days = _parse_period(text)
    return ('traffic', {'days': days})


# ══════════════════════════════════════════════════════════════════════════════
# MAIN HANDLER
# ══════════════════════════════════════════════════════════════════════════════

def handle_query(user_input: str) -> dict:
    """
    Main entry point for natural language queries.
    Returns a dict with structured data and actionable insights.
    """
    if not user_input or not user_input.strip():
        return {"message": "Ask me about your analytics! Try: 'how's traffic?', "
                          "'what's my bounce rate?', or 'compare last week to the week before.'"}

    intent, params = _extract_intent(user_input)
    days = params.get('days', 7)

    try:
        if intent == 'realtime':
            return get_realtime_users()

        if intent == 'bounce':
            return get_bounce_rate(days)

        if intent == 'pages':
            return get_page_performance(days, params.get('sort_by', 'views'))

        if intent == 'devices':
            return get_device_breakdown(days)

        if intent == 'geo':
            return get_geo_data(days)

        if intent == 'hourly':
            return get_hourly_trends(days)

        if intent == 'daily':
            return get_daily_trends(days)

        if intent == 'conversions':
            return get_conversions(days)

        if intent == 'funnel':
            return get_funnel_analysis(days)

        if intent == 'drop':
            return analyze_traffic_drop(days)

        if intent == 'compare':
            return compare_periods(params.get('period1', 7), params.get('period2', 7))

        if intent == 'sources':
            return get_traffic_sources(days)

        # Default: traffic summary
        return get_traffic_summary(days)

    except Exception as e:
        return _api_error(e, f"intent={intent}")


# ══════════════════════════════════════════════════════════════════════════════
# CLI / MAIN
# ══════════════════════════════════════════════════════════════════════════════

if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description="GA Insights CLI")
    sub = parser.add_subparsers(dest="command", help="Commands")

    # Setup
    sub.add_parser("setup", help="Show setup guide")
    sub.add_parser("status", help="Check connection status")

    # Queries
    p = sub.add_parser("traffic", help="Traffic summary")
    p.add_argument("--days", type=int, default=7)

    p = sub.add_parser("sources", help="Traffic sources")
    p.add_argument("--days", type=int, default=7)

    p = sub.add_parser("realtime", help="Active users now")
    sub.add_parser("bounce", help="Bounce rate analysis")
    sub.add_parser("pages", help="Page performance")
    sub.add_parser("devices", help="Device breakdown")
    sub.add_parser("geo", help="Geographic data")
    sub.add_parser("hourly", help="Hourly trends")
    sub.add_parser("daily", help="Daily trends")
    sub.add_parser("conversions", help="Conversion events")
    sub.add_parser("funnel", help="Funnel analysis")

    p = sub.add_parser("compare", help="Compare periods")
    p.add_argument("--period1", type=int, default=7, help="Recent period days")
    p.add_argument("--period2", type=int, default=7, help="Previous period days")

    p = sub.add_parser("drop", help="Traffic drop analysis")
    p.add_argument("--days", type=int, default=7)

    sub.add_parser("cache", help="Clear cache")

    # Query mode
    sub.add_parser("query", help="Natural language query")
    sub.add_argument("query", nargs="...", help="Query string")

    args = parser.parse_args()

    # Helper to print JSON
    def out(data):
        print(json.dumps(data, indent=2, default=str))

    if args.command == "setup":
        out(setup_guide())

    elif args.command == "status":
        out(load_config())

    elif args.command == "traffic":
        out(get_traffic_summary(args.days))

    elif args.command == "sources":
        out(get_traffic_sources(args.days))

    elif args.command == "realtime":
        out(get_realtime_users())

    elif args.command == "bounce":
        out(get_bounce_rate())

    elif args.command == "pages":
        out(get_page_performance())

    elif args.command == "devices":
        out(get_device_breakdown())

    elif args.command == "geo":
        out(get_geo_data())

    elif args.command == "hourly":
        out(get_hourly_trends())

    elif args.command == "daily":
        out(get_daily_trends())

    elif args.command == "conversions":
        out(get_conversions())

    elif args.command == "funnel":
        out(get_funnel_analysis())

    elif args.command == "compare":
        out(compare_periods(args.period1, args.period2))

    elif args.command == "drop":
        out(analyze_traffic_drop(args.days))

    elif args.command == "cache":
        out(cache_clear())

    elif args.command == "query":
        # Join remaining args into query string
        query = " ".join(args.query)
        out(handle_query(query))

    else:
        parser.print_help()