文件预览

capture_keyframes.py

查看 Video Notes 技能包中的文件内容。

文件内容

scripts/capture_keyframes.py

#!/usr/bin/env python3
"""
Identify key subtitle moments and capture video frames at those timestamps.

Usage:
    python3 capture_keyframes.py <youtube_url> <subtitles.json> [--output-dir <dir>] [--max-frames <n>]

Output:
    JSON array of keyframe objects:
    [{"t": "mm:ss", "s": 123.4, "text": "...", "image_b64": "...", "reason": "..."}]

How it works:
1. Score each subtitle entry for "keyness" using heuristics
2. Select top N key moments (spread across the video)
3. Download only the relevant video sections (a few seconds each) via yt-dlp
4. Extract frames with ffmpeg
5. Return base64-encoded JPEG images
"""

import sys
import os
import re
import json
import base64
import subprocess
import tempfile
import argparse
import math


# ── Heuristics for identifying key moments ──────────────────────────────────

# Phrases that signal important structural moments
KEY_PHRASES = [
    # Topic introductions
    r"\bintroducing\b", r"\bintroduce\b", r"\bpresent(ing)?\b", r"\bannounce\b",
    r"\blet('s| us) (look|talk|start|begin|dive)\b",
    r"\bfirst[,.]?\s", r"\bnext[,.]?\s", r"\bfinally[,.]?\s",
    r"\bstep \d+\b", r"\bphase \d+\b",
    # Conclusions / key points
    r"\bin summary\b", r"\bthe key (insight|takeaway|point|finding)\b",
    r"\bmost important(ly)?\b", r"\bcrucially?\b", r"\bfundamentally\b",
    r"\bthe result is\b", r"\bwe found\b", r"\bwe discover(ed)?\b",
    # Demos / shows
    r"\blet('s| us) (watch|see|look at|show)\b", r"\bhere (is|are|you (can )?see)\b",
    r"\bdemonstrat(e|ing|ion)\b", r"\bin action\b",
    # Numbers / data
    r"\b\d+[kKmMbB%]?\s*(hours?|times?|years?|percent|x)\b",
    r"\b(zero|one|two|three|four|five|six|seven|eight|nine|ten)\s+\w+\b",
    # Transitions between big ideas
    r"\b(and )?now[,.]?\s*(let('s)?|we)\b", r"\bmoving on\b", r"\bso[,.]?\s+how\b",
    r"\bthe (first|second|third|fourth|final)\b",
]

KEY_PHRASE_PATTERNS = [re.compile(p, re.IGNORECASE) for p in KEY_PHRASES]

# Words that boost importance score
HIGH_VALUE_WORDS = {
    "introducing", "announcing", "revolutionary", "breakthrough", "first",
    "zero", "100%", "million", "billion", "key", "crucial", "fundamental",
    "result", "discover", "show", "demonstrate", "watch", "see",
}


def score_subtitle(entry: dict) -> float:
    """Score a subtitle entry 0.0–1.0 for keyness."""
    text = entry["text"].lower()
    score = 0.0

    # Pattern matches
    for pat in KEY_PHRASE_PATTERNS:
        if pat.search(text):
            score += 0.15

    # High-value word matches
    words = set(re.findall(r"\b\w+\b", text))
    score += len(words & HIGH_VALUE_WORDS) * 0.1

    # Longer sentences tend to be more substantive
    word_count = len(text.split())
    if word_count > 12:
        score += 0.1
    if word_count > 20:
        score += 0.1

    # Sentences ending with "." are often complete thoughts
    if text.rstrip().endswith("."):
        score += 0.05

    return min(score, 1.0)


def select_keyframes(entries: list[dict], max_frames: int = 8) -> list[dict]:
    """
    Select up to max_frames key entries, distributed across the video duration.
    Ensures no two selected frames are within 60 seconds of each other.
    """
    if not entries:
        return []

    # Score all entries
    scored = [(score_subtitle(e), e) for e in entries]

    # Sort by score descending
    scored.sort(key=lambda x: x[0], reverse=True)

    selected = []
    used_times = []

    for score, entry in scored:
        if len(selected) >= max_frames:
            break
        if score < 0.1:
            break
        t = entry["s"]
        # Enforce minimum gap of 60 seconds between frames
        if any(abs(t - u) < 60 for u in used_times):
            continue
        selected.append({**entry, "score": round(score, 3)})
        used_times.append(t)

    # Sort selected by timestamp for final output
    selected.sort(key=lambda x: x["s"])
    return selected


# ── Video download & frame extraction ───────────────────────────────────────

def ensure_yt_dlp():
    try:
        import yt_dlp  # noqa
    except ImportError:
        subprocess.check_call(
            [sys.executable, "-m", "pip", "install", "yt-dlp", "-q", "--break-system-packages"],
            stderr=subprocess.DEVNULL,
        )


def download_section(url: str, start_sec: float, duration: float, out_path: str) -> bool:
    """Download a short video section using yt-dlp --download-sections."""
    start = max(0, start_sec - 1)  # 1s before for context
    end = start + duration + 2     # a bit extra

    mm_start = f"{int(start)//60:02d}:{int(start)%60:02d}"
    mm_end   = f"{int(end)//60:02d}:{int(end)%60:02d}"
    section  = f"*{mm_start}-{mm_end}"

    cmd = [
        sys.executable, "-m", "yt_dlp",
        "--download-sections", section,
        "--format", "worst[ext=mp4]/worst",
        "--no-playlist",
        "--quiet",
        "-o", out_path,
        url,
    ]
    result = subprocess.run(cmd, capture_output=True, text=True)
    return result.returncode == 0 and os.path.exists(out_path)


def extract_frame(video_path: str, offset_sec: float, out_path: str) -> bool:
    """Extract a single frame from a video file at offset_sec using ffmpeg."""
    cmd = [
        "ffmpeg", "-y",
        "-ss", str(max(0, offset_sec)),
        "-i", video_path,
        "-frames:v", "1",
        "-q:v", "3",          # JPEG quality (2=best, 31=worst)
        "-vf", "scale=960:-2",  # resize to 960px wide
        out_path,
    ]
    result = subprocess.run(cmd, capture_output=True)
    return result.returncode == 0 and os.path.exists(out_path)


def image_to_b64(path: str) -> str:
    with open(path, "rb") as f:
        return base64.b64encode(f.read()).decode()


# ── Main ─────────────────────────────────────────────────────────────────────

def main():
    parser = argparse.ArgumentParser(description="Capture keyframes from a YouTube video")
    parser.add_argument("url", help="YouTube video URL")
    parser.add_argument("subtitles", help="Path to subtitles JSON (from extract_subtitles.py)")
    parser.add_argument("--output-dir", "-o", default="/tmp", help="Directory to save output JSON")
    parser.add_argument("--max-frames", "-n", type=int, default=8, help="Max keyframes to capture (default: 8)")
    parser.add_argument("--output-json", help="Output JSON file path (default: stdout)")
    args = parser.parse_args()

    ensure_yt_dlp()

    # Load subtitles
    with open(args.subtitles) as f:
        entries = json.load(f)
    print(f"[keyframes] Loaded {len(entries)} subtitle entries", file=sys.stderr)

    # Select key moments
    keyframes = select_keyframes(entries, max_frames=args.max_frames)
    print(f"[keyframes] Selected {len(keyframes)} key moments:", file=sys.stderr)
    for kf in keyframes:
        print(f"  {kf['t']}  score={kf['score']}  \"{kf['text'][:60]}\"", file=sys.stderr)

    # Download sections and capture frames
    results = []
    with tempfile.TemporaryDirectory() as tmpdir:
        for i, kf in enumerate(keyframes):
            sec = kf["s"]
            print(f"[keyframes] Capturing frame {i+1}/{len(keyframes)} at {kf['t']}...", file=sys.stderr)

            video_path = os.path.join(tmpdir, f"clip_{i}.mp4")
            frame_path = os.path.join(tmpdir, f"frame_{i}.jpg")

            ok = download_section(args.url, sec, duration=4, out_path=video_path)
            if not ok:
                print(f"  [!] Failed to download section at {kf['t']}", file=sys.stderr)
                continue

            # The downloaded clip starts ~1s before our target; extract at offset=1s
            ok = extract_frame(video_path, offset_sec=1.5, out_path=frame_path)
            if not ok:
                print(f"  [!] Failed to extract frame at {kf['t']}", file=sys.stderr)
                continue

            b64 = image_to_b64(frame_path)
            results.append({
                "t": kf["t"],
                "s": kf["s"],
                "text": kf["text"],
                "score": kf["score"],
                "image_b64": b64,
            })
            print(f"  ✓ Captured ({len(b64)//1024}KB)", file=sys.stderr)

    print(f"[keyframes] Done: {len(results)}/{len(keyframes)} frames captured", file=sys.stderr)

    output = json.dumps(results, ensure_ascii=False, indent=2)
    if args.output_json:
        with open(args.output_json, "w") as f:
            f.write(output)
        print(f"[keyframes] Saved to: {args.output_json}", file=sys.stderr)
    else:
        print(output)


if __name__ == "__main__":
    main()