streaming-patterns

Live Streaming Patterns

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "streaming-patterns" with this command: npx skills add mindmorass/reflex/mindmorass-reflex-streaming-patterns

Live Streaming Patterns

Best practices for live streaming to YouTube, Twitch, and other platforms.

Platform Configuration

YouTube Live

from googleapiclient.discovery import build from google.oauth2.credentials import Credentials

def create_youtube_broadcast( credentials: Credentials, title: str, description: str, scheduled_start: str, privacy: str = "unlisted" ): """Create a YouTube live broadcast.""" youtube = build('youtube', 'v3', credentials=credentials)

# Create broadcast
broadcast = youtube.liveBroadcasts().insert(
    part="snippet,status,contentDetails",
    body={
        "snippet": {
            "title": title,
            "description": description,
            "scheduledStartTime": scheduled_start
        },
        "status": {
            "privacyStatus": privacy,
            "selfDeclaredMadeForKids": False
        },
        "contentDetails": {
            "enableAutoStart": True,
            "enableAutoStop": True,
            "enableDvr": True,
            "recordFromStart": True
        }
    }
).execute()

# Create stream
stream = youtube.liveStreams().insert(
    part="snippet,cdn",
    body={
        "snippet": {
            "title": f"Stream for {title}"
        },
        "cdn": {
            "frameRate": "60fps",
            "ingestionType": "rtmp",
            "resolution": "1080p"
        }
    }
).execute()

# Bind stream to broadcast
youtube.liveBroadcasts().bind(
    part="id,contentDetails",
    id=broadcast['id'],
    streamId=stream['id']
).execute()

return {
    "broadcast_id": broadcast['id'],
    "stream_key": stream['cdn']['ingestionInfo']['streamName'],
    "rtmp_url": stream['cdn']['ingestionInfo']['ingestionAddress'],
    "watch_url": f"https://youtube.com/watch?v={broadcast['id']}"
}

def transition_broadcast(credentials: Credentials, broadcast_id: str, status: str): """Transition broadcast status: testing, live, complete.""" youtube = build('youtube', 'v3', credentials=credentials)

return youtube.liveBroadcasts().transition(
    broadcastStatus=status,
    id=broadcast_id,
    part="status"
).execute()

Twitch

import requests

class TwitchAPI: def init(self, client_id: str, access_token: str): self.client_id = client_id self.access_token = access_token self.base_url = "https://api.twitch.tv/helix" self.headers = { "Client-ID": client_id, "Authorization": f"Bearer {access_token}" }

def get_stream_key(self, broadcaster_id: str) -> str:
    """Get stream key for broadcaster."""
    response = requests.get(
        f"{self.base_url}/streams/key",
        headers=self.headers,
        params={"broadcaster_id": broadcaster_id}
    )
    return response.json()['data'][0]['stream_key']

def update_stream_info(
    self,
    broadcaster_id: str,
    title: str,
    game_id: str = None,
    language: str = "en"
):
    """Update stream title and category."""
    data = {
        "broadcaster_id": broadcaster_id,
        "title": title,
        "broadcaster_language": language
    }
    if game_id:
        data["game_id"] = game_id

    return requests.patch(
        f"{self.base_url}/channels",
        headers=self.headers,
        json=data
    )

def get_stream_status(self, user_login: str) -> dict:
    """Check if channel is live."""
    response = requests.get(
        f"{self.base_url}/streams",
        headers=self.headers,
        params={"user_login": user_login}
    )
    data = response.json()['data']
    return data[0] if data else None

def create_clip(self, broadcaster_id: str) -> dict:
    """Create clip from live stream."""
    response = requests.post(
        f"{self.base_url}/clips",
        headers=self.headers,
        params={"broadcaster_id": broadcaster_id}
    )
    return response.json()['data'][0]

RTMP Streaming

FFmpeg RTMP Push

Stream to YouTube

ffmpeg -re -i input.mp4
-c:v libx264 -preset veryfast -maxrate 4500k -bufsize 9000k
-pix_fmt yuv420p -g 60
-c:a aac -b:a 160k -ar 44100
-f flv "rtmp://a.rtmp.youtube.com/live2/YOUR_STREAM_KEY"

Stream to Twitch

ffmpeg -re -i input.mp4
-c:v libx264 -preset veryfast -maxrate 6000k -bufsize 12000k
-pix_fmt yuv420p -g 60
-c:a aac -b:a 160k -ar 44100
-f flv "rtmp://live.twitch.tv/app/YOUR_STREAM_KEY"

Stream desktop (macOS)

ffmpeg -f avfoundation -framerate 30 -i "1:0"
-c:v libx264 -preset ultrafast -tune zerolatency
-c:a aac -b:a 128k
-f flv "rtmp://destination/stream_key"

Stream desktop (Linux)

ffmpeg -f x11grab -framerate 30 -video_size 1920x1080 -i :0.0
-f pulse -i default
-c:v libx264 -preset ultrafast -tune zerolatency
-c:a aac -b:a 128k
-f flv "rtmp://destination/stream_key"

Multi-Platform Streaming

Using tee muxer to stream to multiple platforms

ffmpeg -re -i input.mp4
-c:v libx264 -preset veryfast -b:v 4500k
-c:a aac -b:a 160k
-f tee -map 0:v -map 0:a
"[f=flv]rtmp://a.rtmp.youtube.com/live2/YT_KEY|
[f=flv]rtmp://live.twitch.tv/app/TWITCH_KEY|
[f=flv]rtmp://live-api-s.facebook.com:443/rtmp/FB_KEY"

Python RTMP Handler

import subprocess from dataclasses import dataclass from typing import List, Optional

@dataclass class StreamDestination: name: str rtmp_url: str stream_key: str

@property
def full_url(self) -> str:
    return f"{self.rtmp_url}/{self.stream_key}"

class MultiStreamer: def init( self, input_source: str, destinations: List[StreamDestination], video_bitrate: str = "4500k", audio_bitrate: str = "160k" ): self.input_source = input_source self.destinations = destinations self.video_bitrate = video_bitrate self.audio_bitrate = audio_bitrate self.process: Optional[subprocess.Popen] = None

def build_command(self) -> List[str]:
    """Build FFmpeg command for multi-platform streaming."""
    cmd = [
        "ffmpeg",
        "-re", "-i", self.input_source,
        "-c:v", "libx264",
        "-preset", "veryfast",
        "-b:v", self.video_bitrate,
        "-maxrate", self.video_bitrate,
        "-bufsize", str(int(self.video_bitrate[:-1]) * 2) + "k",
        "-pix_fmt", "yuv420p",
        "-g", "60",
        "-c:a", "aac",
        "-b:a", self.audio_bitrate,
        "-ar", "44100"
    ]

    if len(self.destinations) == 1:
        cmd.extend(["-f", "flv", self.destinations[0].full_url])
    else:
        # Use tee muxer for multiple destinations
        tee_outputs = "|".join(
            f"[f=flv]{dest.full_url}" for dest in self.destinations
        )
        cmd.extend([
            "-f", "tee",
            "-map", "0:v", "-map", "0:a",
            tee_outputs
        ])

    return cmd

def start(self):
    """Start streaming."""
    cmd = self.build_command()
    self.process = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    )

def stop(self):
    """Stop streaming."""
    if self.process:
        self.process.terminate()
        self.process.wait()

OBS WebSocket Integration

import obswebsocket from obswebsocket import obsws, requests as obs_requests

class OBSController: def init(self, host: str = "localhost", port: int = 4455, password: str = ""): self.ws = obsws(host, port, password)

def connect(self):
    self.ws.connect()

def disconnect(self):
    self.ws.disconnect()

def start_streaming(self):
    """Start OBS streaming."""
    self.ws.call(obs_requests.StartStream())

def stop_streaming(self):
    """Stop OBS streaming."""
    self.ws.call(obs_requests.StopStream())

def start_recording(self):
    """Start OBS recording."""
    self.ws.call(obs_requests.StartRecord())

def stop_recording(self):
    """Stop OBS recording."""
    self.ws.call(obs_requests.StopRecord())

def switch_scene(self, scene_name: str):
    """Switch to a different scene."""
    self.ws.call(obs_requests.SetCurrentProgramScene(sceneName=scene_name))

def get_scenes(self) -> list:
    """Get list of available scenes."""
    response = self.ws.call(obs_requests.GetSceneList())
    return [scene['sceneName'] for scene in response.getScenes()]

def set_source_visibility(self, scene: str, source: str, visible: bool):
    """Show or hide a source in a scene."""
    self.ws.call(obs_requests.SetSceneItemEnabled(
        sceneName=scene,
        sceneItemId=self._get_source_id(scene, source),
        sceneItemEnabled=visible
    ))

def _get_source_id(self, scene: str, source: str) -> int:
    """Get source ID by name."""
    response = self.ws.call(obs_requests.GetSceneItemId(
        sceneName=scene,
        sourceName=source
    ))
    return response.getSceneItemId()

def set_stream_settings(self, server: str, key: str):
    """Update stream settings."""
    self.ws.call(obs_requests.SetStreamServiceSettings(
        streamServiceType="rtmp_common",
        streamServiceSettings={
            "server": server,
            "key": key
        }
    ))

Stream Automation

Scheduled Stream

import asyncio from datetime import datetime, timedelta from typing import Callable

class StreamScheduler: def init(self): self.scheduled_streams = []

async def schedule_stream(
    self,
    start_time: datetime,
    duration: timedelta,
    start_callback: Callable,
    stop_callback: Callable
):
    """Schedule a stream for a specific time."""
    now = datetime.now()
    delay = (start_time - now).total_seconds()

    if delay > 0:
        await asyncio.sleep(delay)

    # Start stream
    await start_callback()

    # Wait for duration
    await asyncio.sleep(duration.total_seconds())

    # Stop stream
    await stop_callback()

Usage

async def main(): scheduler = StreamScheduler() obs = OBSController() obs.connect()

start_time = datetime.now() + timedelta(minutes=5)
duration = timedelta(hours=2)

await scheduler.schedule_stream(
    start_time=start_time,
    duration=duration,
    start_callback=lambda: obs.start_streaming(),
    stop_callback=lambda: obs.stop_streaming()
)

Chat Bot Integration

from twitchio.ext import commands

class StreamBot(commands.Bot): def init(self, token: str, prefix: str, channels: list): super().init(token=token, prefix=prefix, initial_channels=channels) self.obs = OBSController() self.obs.connect()

async def event_ready(self):
    print(f'Bot is ready | {self.nick}')

async def event_message(self, message):
    if message.echo:
        return
    await self.handle_commands(message)

@commands.command(name='scene')
async def scene_command(self, ctx, scene_name: str):
    """Switch OBS scene via chat command."""
    if ctx.author.is_mod:
        try:
            self.obs.switch_scene(scene_name)
            await ctx.send(f"Switched to scene: {scene_name}")
        except Exception as e:
            await ctx.send(f"Error switching scene: {e}")

@commands.command(name='brb')
async def brb_command(self, ctx):
    """Switch to BRB scene."""
    if ctx.author.is_mod:
        self.obs.switch_scene("BRB")
        await ctx.send("Be right back!")

@commands.command(name='back')
async def back_command(self, ctx):
    """Switch back to main scene."""
    if ctx.author.is_mod:
        self.obs.switch_scene("Main")
        await ctx.send("We're back!")

Stream Quality Presets

from dataclasses import dataclass from enum import Enum

class StreamQuality(Enum): LOW = "480p" MEDIUM = "720p" HIGH = "1080p" ULTRA = "1440p"

@dataclass class EncodingPreset: resolution: str video_bitrate: str audio_bitrate: str framerate: int preset: str

QUALITY_PRESETS = { StreamQuality.LOW: EncodingPreset( resolution="854x480", video_bitrate="1500k", audio_bitrate="96k", framerate=30, preset="veryfast" ), StreamQuality.MEDIUM: EncodingPreset( resolution="1280x720", video_bitrate="3000k", audio_bitrate="128k", framerate=30, preset="veryfast" ), StreamQuality.HIGH: EncodingPreset( resolution="1920x1080", video_bitrate="4500k", audio_bitrate="160k", framerate=60, preset="veryfast" ), StreamQuality.ULTRA: EncodingPreset( resolution="2560x1440", video_bitrate="9000k", audio_bitrate="192k", framerate=60, preset="fast" ) }

Health Monitoring

import asyncio from dataclasses import dataclass from datetime import datetime

@dataclass class StreamHealth: bitrate: float dropped_frames: int fps: float cpu_usage: float timestamp: datetime

class StreamMonitor: def init(self, obs: OBSController): self.obs = obs self.health_history: list[StreamHealth] = []

async def monitor(self, interval: float = 5.0):
    """Continuously monitor stream health."""
    while True:
        try:
            stats = self.obs.ws.call(obs_requests.GetStats())
            health = StreamHealth(
                bitrate=stats.getKbitsPerSec(),
                dropped_frames=stats.getOutputSkippedFrames(),
                fps=stats.getActiveFps(),
                cpu_usage=stats.getCpuUsage(),
                timestamp=datetime.now()
            )
            self.health_history.append(health)

            # Alert on issues
            if health.dropped_frames > 100:
                print(f"Warning: High dropped frames: {health.dropped_frames}")
            if health.fps < 25:
                print(f"Warning: Low FPS: {health.fps}")

        except Exception as e:
            print(f"Monitor error: {e}")

        await asyncio.sleep(interval)

References

  • YouTube Live Streaming API

  • Twitch API Documentation

  • OBS WebSocket Protocol

  • FFmpeg Streaming Guide

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

ffmpeg-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

site-crawler

No summary provided by upstream source.

Repository SourceNeeds Review
General

ai-video-generation

No summary provided by upstream source.

Repository SourceNeeds Review