youtube-harvester

YouTube Harvester Skill

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "youtube-harvester" with this command: npx skills add mindmorass/reflex/mindmorass-reflex-youtube-harvester

YouTube Harvester Skill

Extract and ingest YouTube video transcripts into RAG with proper chunking and metadata.

Overview

YouTube is a rich source of tutorials, lectures, and explanations. This skill covers:

  • Transcript extraction (manual and auto-generated)

  • Timestamp-aware chunking

  • Playlist and channel harvesting

  • Metadata enrichment

Prerequisites

Install yt-dlp for video metadata and subtitles

pip install yt-dlp

Install youtube-transcript-api for cleaner transcript access

pip install youtube-transcript-api

Optional: for audio transcription fallback

pip install openai-whisper

Extraction Methods

Method 1: youtube-transcript-api (Recommended)

Best for clean transcript text with timestamps.

#!/usr/bin/env python3 """Extract YouTube transcripts using youtube-transcript-api."""

from youtube_transcript_api import YouTubeTranscriptApi from youtube_transcript_api.formatters import TextFormatter import json import re from typing import Dict, List, Optional from datetime import datetime

def extract_video_id(url: str) -> str: """Extract video ID from various YouTube URL formats.""" patterns = [ r'(?:v=|/v/|youtu.be/)([a-zA-Z0-9_-]{11})', r'(?:embed/)([a-zA-Z0-9_-]{11})', ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) raise ValueError(f"Could not extract video ID from: {url}")

def get_transcript(video_id: str, languages: List[str] = ['en']) -> List[Dict]: """ Fetch transcript for a video.

Args:
    video_id: YouTube video ID
    languages: Preferred languages in order

Returns:
    List of transcript segments with text, start, duration
"""
try:
    # Try to get manual captions first
    transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)

    try:
        transcript = transcript_list.find_manually_created_transcript(languages)
    except:
        # Fall back to auto-generated
        transcript = transcript_list.find_generated_transcript(languages)

    return transcript.fetch()

except Exception as e:
    print(f"Error fetching transcript: {e}")
    return []

def format_timestamp(seconds: float) -> str: """Convert seconds to HH:MM:SS format.""" hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) secs = int(seconds % 60)

if hours > 0:
    return f"{hours:02d}:{minutes:02d}:{secs:02d}"
return f"{minutes:02d}:{secs:02d}"

def get_video_metadata(video_id: str) -> Dict: """Get video metadata using yt-dlp.""" import subprocess import json

result = subprocess.run(
    ['yt-dlp', '--dump-json', '--no-download', f'https://youtube.com/watch?v={video_id}'],
    capture_output=True,
    text=True
)

if result.returncode == 0:
    data = json.loads(result.stdout)
    return {
        'title': data.get('title'),
        'channel': data.get('channel'),
        'channel_id': data.get('channel_id'),
        'upload_date': data.get('upload_date'),
        'duration': data.get('duration'),
        'view_count': data.get('view_count'),
        'description': data.get('description', '')[:500],  # Truncate
        'tags': data.get('tags', [])[:10],  # Limit tags
    }
return {}

Method 2: yt-dlp Subtitles

Better for batch processing and when API limits are hit.

#!/bin/bash

Extract subtitles using yt-dlp

VIDEO_URL="$1" OUTPUT_DIR="${2:-.}"

Download auto-generated subtitles

yt-dlp
--write-auto-sub
--sub-lang en
--sub-format vtt
--skip-download
--output "$OUTPUT_DIR/%(title)s.%(ext)s"
"$VIDEO_URL"

Convert VTT to plain text

for vtt in "$OUTPUT_DIR"/.vtt; do # Remove VTT formatting, keep just text sed -e '/^WEBVTT/d'
-e '/^Kind:/d'
-e '/^Language:/d'
-e '/^[0-9][0-9]:[0-9][0-9]/d'
-e '/-->/d'
-e 's/<[^>]
>//g'
-e '/^$/d'
"$vtt" > "${vtt%.vtt}.txt" done

Chunking Strategies

Strategy 1: Time-Based Chunks

Split transcript into fixed time intervals.

def chunk_by_time( transcript: List[Dict], chunk_duration: int = 300 # 5 minutes ) -> List[Dict]: """ Chunk transcript by time intervals.

Args:
    transcript: List of transcript segments
    chunk_duration: Seconds per chunk
"""
chunks = []
current_chunk = {
    'text': '',
    'start': 0,
    'end': 0,
    'segments': []
}

for segment in transcript:
    segment_start = segment['start']

    # Check if we need to start a new chunk
    if segment_start >= current_chunk['start'] + chunk_duration:
        if current_chunk['text']:
            chunks.append(current_chunk)

        current_chunk = {
            'text': '',
            'start': segment_start,
            'end': segment_start,
            'segments': []
        }

    current_chunk['text'] += ' ' + segment['text']
    current_chunk['end'] = segment['start'] + segment.get('duration', 0)
    current_chunk['segments'].append(segment)

# Don't forget the last chunk
if current_chunk['text']:
    chunks.append(current_chunk)

return chunks

Strategy 2: Topic-Based Chunks

Split when topic appears to change (silence gaps or topic markers).

def chunk_by_topic( transcript: List[Dict], gap_threshold: float = 5.0, # Seconds of silence indicating topic change min_chunk_size: int = 100 # Minimum words per chunk ) -> List[Dict]: """ Chunk transcript by topic boundaries.

Uses gaps in speech and sentence boundaries to identify topic changes.
"""
chunks = []
current_chunk = {
    'text': '',
    'start': 0,
    'end': 0,
    'word_count': 0
}

prev_end = 0

for segment in transcript:
    segment_start = segment['start']
    gap = segment_start - prev_end
    word_count = len(segment['text'].split())

    # Check for topic boundary
    is_boundary = (
        gap > gap_threshold and
        current_chunk['word_count'] >= min_chunk_size
    )

    if is_boundary:
        if current_chunk['text']:
            chunks.append(current_chunk)

        current_chunk = {
            'text': '',
            'start': segment_start,
            'end': segment_start,
            'word_count': 0
        }

    current_chunk['text'] += ' ' + segment['text']
    current_chunk['end'] = segment_start + segment.get('duration', 0)
    current_chunk['word_count'] += word_count
    prev_end = current_chunk['end']

if current_chunk['text']:
    chunks.append(current_chunk)

return chunks

Strategy 3: Semantic Chunks

Use embeddings to find natural topic boundaries.

def chunk_by_semantics( transcript: List[Dict], similarity_threshold: float = 0.7, window_size: int = 5 ) -> List[Dict]: """ Chunk based on semantic similarity between segments.

Groups semantically similar consecutive segments together.
"""
from sentence_transformers import SentenceTransformer
import numpy as np

model = SentenceTransformer('all-MiniLM-L6-v2')

# Combine segments into windows for more stable embeddings
windows = []
for i in range(0, len(transcript), window_size):
    window_text = ' '.join(
        s['text'] for s in transcript[i:i+window_size]
    )
    windows.append({
        'text': window_text,
        'start': transcript[i]['start'],
        'end': transcript[min(i+window_size-1, len(transcript)-1)]['start'],
        'segments': transcript[i:i+window_size]
    })

# Get embeddings
embeddings = model.encode([w['text'] for w in windows])

# Find boundaries where similarity drops
chunks = []
current_chunk = windows[0].copy() if windows else None

for i in range(1, len(windows)):
    similarity = np.dot(embeddings[i-1], embeddings[i]) / (
        np.linalg.norm(embeddings[i-1]) * np.linalg.norm(embeddings[i])
    )

    if similarity &#x3C; similarity_threshold:
        # Topic boundary detected
        if current_chunk:
            chunks.append(current_chunk)
        current_chunk = windows[i].copy()
    else:
        # Continue current chunk
        current_chunk['text'] += ' ' + windows[i]['text']
        current_chunk['end'] = windows[i]['end']
        current_chunk['segments'].extend(windows[i]['segments'])

if current_chunk:
    chunks.append(current_chunk)

return chunks

Full Harvesting Pipeline

#!/usr/bin/env python3 """Complete YouTube harvesting pipeline."""

import json from typing import List, Dict, Optional from datetime import datetime

async def harvest_youtube_video( url: str, collection: str, chunk_strategy: str = "time", # time, topic, semantic chunk_size: int = 300 ) -> Dict: """ Harvest a single YouTube video into RAG.

Args:
    url: YouTube video URL
    collection: Target RAG collection
    chunk_strategy: How to chunk the transcript
    chunk_size: Size parameter for chunking

Returns:
    Harvest report
"""
video_id = extract_video_id(url)

# Get metadata
metadata = get_video_metadata(video_id)
if not metadata:
    return {"status": "error", "error": "Could not fetch metadata"}

# Get transcript
transcript = get_transcript(video_id)
if not transcript:
    return {"status": "error", "error": "No transcript available"}

# Chunk based on strategy
if chunk_strategy == "time":
    chunks = chunk_by_time(transcript, chunk_size)
elif chunk_strategy == "topic":
    chunks = chunk_by_topic(transcript)
elif chunk_strategy == "semantic":
    chunks = chunk_by_semantics(transcript)
else:
    chunks = chunk_by_time(transcript, chunk_size)

# Ingest each chunk
ingested = 0
for i, chunk in enumerate(chunks):
    chunk_metadata = {
        "source_type": "youtube",
        "source_url": url,
        "video_id": video_id,
        "title": metadata.get("title"),
        "channel": metadata.get("channel"),
        "upload_date": metadata.get("upload_date"),
        "duration_seconds": metadata.get("duration"),
        "timestamp_start": format_timestamp(chunk["start"]),
        "timestamp_end": format_timestamp(chunk["end"]),
        "chunk_index": i,
        "total_chunks": len(chunks),
        "harvested_at": datetime.now().isoformat()
    }

    await ingest(
        content=chunk["text"].strip(),
        collection=collection,
        metadata=chunk_metadata,
        doc_id=f"yt_{video_id}_chunk_{i}"
    )
    ingested += 1

return {
    "status": "success",
    "video_id": video_id,
    "title": metadata.get("title"),
    "chunks": ingested,
    "collection": collection
}

async def harvest_youtube_playlist( playlist_url: str, collection: str, **kwargs ) -> Dict: """Harvest all videos in a playlist.""" import subprocess

# Get playlist video IDs
result = subprocess.run(
    ['yt-dlp', '--flat-playlist', '--print', 'id', playlist_url],
    capture_output=True,
    text=True
)

video_ids = result.stdout.strip().split('

')

results = []
for video_id in video_ids:
    url = f"https://youtube.com/watch?v={video_id}"
    result = await harvest_youtube_video(url, collection, **kwargs)
    results.append(result)

success = sum(1 for r in results if r.get("status") == "success")

return {
    "status": "success",
    "videos_processed": len(video_ids),
    "videos_succeeded": success,
    "videos_failed": len(video_ids) - success,
    "details": results
}

async def harvest_youtube_channel( channel_url: str, collection: str, max_videos: int = 50, **kwargs ) -> Dict: """Harvest recent videos from a channel.""" import subprocess

# Get recent video IDs from channel
result = subprocess.run(
    ['yt-dlp', '--flat-playlist', '--print', 'id',
     '--playlist-end', str(max_videos), channel_url],
    capture_output=True,
    text=True
)

video_ids = result.stdout.strip().split('

')

results = []
for video_id in video_ids:
    if video_id:
        url = f"https://youtube.com/watch?v={video_id}"
        result = await harvest_youtube_video(url, collection, **kwargs)
        results.append(result)

success = sum(1 for r in results if r.get("status") == "success")

return {
    "status": "success",
    "videos_processed": len(video_ids),
    "videos_succeeded": success,
    "collection": collection
}

Metadata Schema

YouTube video chunk metadata

source_type: youtube source_url: https://youtube.com/watch?v=... video_id: dQw4w9WgXcQ title: "Video Title" channel: "Channel Name" channel_id: UC... upload_date: "20240101" duration_seconds: 930 timestamp_start: "05:30" timestamp_end: "10:00" chunk_index: 2 total_chunks: 12 harvested_at: "2024-01-01T12:00:00Z" tags: [tag1, tag2]

Error Handling

Error Handling

No transcript available Log, skip, note in report

Private/deleted video Skip with error note

Age-restricted May need authentication

Rate limited Back off, retry with delay

API quota exceeded Switch to yt-dlp method

Usage Examples

Single video

result = await harvest_youtube_video( url="https://youtube.com/watch?v=VIDEO_ID", collection="ml_tutorials", chunk_strategy="topic" )

Playlist

result = await harvest_youtube_playlist( playlist_url="https://youtube.com/playlist?list=PLAYLIST_ID", collection="course_lectures", chunk_strategy="time", chunk_size=600 # 10-minute chunks )

Channel (recent videos)

result = await harvest_youtube_channel( channel_url="https://youtube.com/@ChannelName", collection="channel_content", max_videos=20 )

Refinement Notes

Track improvements as you use this skill.

  • Transcript extraction tested

  • Chunking strategies compared

  • Metadata enrichment working

  • Playlist harvesting tested

  • Channel following implemented

  • Error handling robust

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

ffmpeg-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

site-crawler

No summary provided by upstream source.

Repository SourceNeeds Review
General

ai-video-generation

No summary provided by upstream source.

Repository SourceNeeds Review