Live Streaming Patterns
Best practices for live streaming to YouTube, Twitch, and other platforms.
Platform Configuration
YouTube Live
from googleapiclient.discovery import build from google.oauth2.credentials import Credentials
def create_youtube_broadcast( credentials: Credentials, title: str, description: str, scheduled_start: str, privacy: str = "unlisted" ): """Create a YouTube live broadcast.""" youtube = build('youtube', 'v3', credentials=credentials)
# Create broadcast
broadcast = youtube.liveBroadcasts().insert(
part="snippet,status,contentDetails",
body={
"snippet": {
"title": title,
"description": description,
"scheduledStartTime": scheduled_start
},
"status": {
"privacyStatus": privacy,
"selfDeclaredMadeForKids": False
},
"contentDetails": {
"enableAutoStart": True,
"enableAutoStop": True,
"enableDvr": True,
"recordFromStart": True
}
}
).execute()
# Create stream
stream = youtube.liveStreams().insert(
part="snippet,cdn",
body={
"snippet": {
"title": f"Stream for {title}"
},
"cdn": {
"frameRate": "60fps",
"ingestionType": "rtmp",
"resolution": "1080p"
}
}
).execute()
# Bind stream to broadcast
youtube.liveBroadcasts().bind(
part="id,contentDetails",
id=broadcast['id'],
streamId=stream['id']
).execute()
return {
"broadcast_id": broadcast['id'],
"stream_key": stream['cdn']['ingestionInfo']['streamName'],
"rtmp_url": stream['cdn']['ingestionInfo']['ingestionAddress'],
"watch_url": f"https://youtube.com/watch?v={broadcast['id']}"
}
def transition_broadcast(credentials: Credentials, broadcast_id: str, status: str): """Transition broadcast status: testing, live, complete.""" youtube = build('youtube', 'v3', credentials=credentials)
return youtube.liveBroadcasts().transition(
broadcastStatus=status,
id=broadcast_id,
part="status"
).execute()
Twitch
import requests
class TwitchAPI: def init(self, client_id: str, access_token: str): self.client_id = client_id self.access_token = access_token self.base_url = "https://api.twitch.tv/helix" self.headers = { "Client-ID": client_id, "Authorization": f"Bearer {access_token}" }
def get_stream_key(self, broadcaster_id: str) -> str:
"""Get stream key for broadcaster."""
response = requests.get(
f"{self.base_url}/streams/key",
headers=self.headers,
params={"broadcaster_id": broadcaster_id}
)
return response.json()['data'][0]['stream_key']
def update_stream_info(
self,
broadcaster_id: str,
title: str,
game_id: str = None,
language: str = "en"
):
"""Update stream title and category."""
data = {
"broadcaster_id": broadcaster_id,
"title": title,
"broadcaster_language": language
}
if game_id:
data["game_id"] = game_id
return requests.patch(
f"{self.base_url}/channels",
headers=self.headers,
json=data
)
def get_stream_status(self, user_login: str) -> dict:
"""Check if channel is live."""
response = requests.get(
f"{self.base_url}/streams",
headers=self.headers,
params={"user_login": user_login}
)
data = response.json()['data']
return data[0] if data else None
def create_clip(self, broadcaster_id: str) -> dict:
"""Create clip from live stream."""
response = requests.post(
f"{self.base_url}/clips",
headers=self.headers,
params={"broadcaster_id": broadcaster_id}
)
return response.json()['data'][0]
RTMP Streaming
FFmpeg RTMP Push
Stream to YouTube
ffmpeg -re -i input.mp4
-c:v libx264 -preset veryfast -maxrate 4500k -bufsize 9000k
-pix_fmt yuv420p -g 60
-c:a aac -b:a 160k -ar 44100
-f flv "rtmp://a.rtmp.youtube.com/live2/YOUR_STREAM_KEY"
Stream to Twitch
ffmpeg -re -i input.mp4
-c:v libx264 -preset veryfast -maxrate 6000k -bufsize 12000k
-pix_fmt yuv420p -g 60
-c:a aac -b:a 160k -ar 44100
-f flv "rtmp://live.twitch.tv/app/YOUR_STREAM_KEY"
Stream desktop (macOS)
ffmpeg -f avfoundation -framerate 30 -i "1:0"
-c:v libx264 -preset ultrafast -tune zerolatency
-c:a aac -b:a 128k
-f flv "rtmp://destination/stream_key"
Stream desktop (Linux)
ffmpeg -f x11grab -framerate 30 -video_size 1920x1080 -i :0.0
-f pulse -i default
-c:v libx264 -preset ultrafast -tune zerolatency
-c:a aac -b:a 128k
-f flv "rtmp://destination/stream_key"
Multi-Platform Streaming
Using tee muxer to stream to multiple platforms
ffmpeg -re -i input.mp4
-c:v libx264 -preset veryfast -b:v 4500k
-c:a aac -b:a 160k
-f tee -map 0:v -map 0:a
"[f=flv]rtmp://a.rtmp.youtube.com/live2/YT_KEY|
[f=flv]rtmp://live.twitch.tv/app/TWITCH_KEY|
[f=flv]rtmp://live-api-s.facebook.com:443/rtmp/FB_KEY"
Python RTMP Handler
import subprocess from dataclasses import dataclass from typing import List, Optional
@dataclass class StreamDestination: name: str rtmp_url: str stream_key: str
@property
def full_url(self) -> str:
return f"{self.rtmp_url}/{self.stream_key}"
class MultiStreamer: def init( self, input_source: str, destinations: List[StreamDestination], video_bitrate: str = "4500k", audio_bitrate: str = "160k" ): self.input_source = input_source self.destinations = destinations self.video_bitrate = video_bitrate self.audio_bitrate = audio_bitrate self.process: Optional[subprocess.Popen] = None
def build_command(self) -> List[str]:
"""Build FFmpeg command for multi-platform streaming."""
cmd = [
"ffmpeg",
"-re", "-i", self.input_source,
"-c:v", "libx264",
"-preset", "veryfast",
"-b:v", self.video_bitrate,
"-maxrate", self.video_bitrate,
"-bufsize", str(int(self.video_bitrate[:-1]) * 2) + "k",
"-pix_fmt", "yuv420p",
"-g", "60",
"-c:a", "aac",
"-b:a", self.audio_bitrate,
"-ar", "44100"
]
if len(self.destinations) == 1:
cmd.extend(["-f", "flv", self.destinations[0].full_url])
else:
# Use tee muxer for multiple destinations
tee_outputs = "|".join(
f"[f=flv]{dest.full_url}" for dest in self.destinations
)
cmd.extend([
"-f", "tee",
"-map", "0:v", "-map", "0:a",
tee_outputs
])
return cmd
def start(self):
"""Start streaming."""
cmd = self.build_command()
self.process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
def stop(self):
"""Stop streaming."""
if self.process:
self.process.terminate()
self.process.wait()
OBS WebSocket Integration
import obswebsocket from obswebsocket import obsws, requests as obs_requests
class OBSController: def init(self, host: str = "localhost", port: int = 4455, password: str = ""): self.ws = obsws(host, port, password)
def connect(self):
self.ws.connect()
def disconnect(self):
self.ws.disconnect()
def start_streaming(self):
"""Start OBS streaming."""
self.ws.call(obs_requests.StartStream())
def stop_streaming(self):
"""Stop OBS streaming."""
self.ws.call(obs_requests.StopStream())
def start_recording(self):
"""Start OBS recording."""
self.ws.call(obs_requests.StartRecord())
def stop_recording(self):
"""Stop OBS recording."""
self.ws.call(obs_requests.StopRecord())
def switch_scene(self, scene_name: str):
"""Switch to a different scene."""
self.ws.call(obs_requests.SetCurrentProgramScene(sceneName=scene_name))
def get_scenes(self) -> list:
"""Get list of available scenes."""
response = self.ws.call(obs_requests.GetSceneList())
return [scene['sceneName'] for scene in response.getScenes()]
def set_source_visibility(self, scene: str, source: str, visible: bool):
"""Show or hide a source in a scene."""
self.ws.call(obs_requests.SetSceneItemEnabled(
sceneName=scene,
sceneItemId=self._get_source_id(scene, source),
sceneItemEnabled=visible
))
def _get_source_id(self, scene: str, source: str) -> int:
"""Get source ID by name."""
response = self.ws.call(obs_requests.GetSceneItemId(
sceneName=scene,
sourceName=source
))
return response.getSceneItemId()
def set_stream_settings(self, server: str, key: str):
"""Update stream settings."""
self.ws.call(obs_requests.SetStreamServiceSettings(
streamServiceType="rtmp_common",
streamServiceSettings={
"server": server,
"key": key
}
))
Stream Automation
Scheduled Stream
import asyncio from datetime import datetime, timedelta from typing import Callable
class StreamScheduler: def init(self): self.scheduled_streams = []
async def schedule_stream(
self,
start_time: datetime,
duration: timedelta,
start_callback: Callable,
stop_callback: Callable
):
"""Schedule a stream for a specific time."""
now = datetime.now()
delay = (start_time - now).total_seconds()
if delay > 0:
await asyncio.sleep(delay)
# Start stream
await start_callback()
# Wait for duration
await asyncio.sleep(duration.total_seconds())
# Stop stream
await stop_callback()
Usage
async def main(): scheduler = StreamScheduler() obs = OBSController() obs.connect()
start_time = datetime.now() + timedelta(minutes=5)
duration = timedelta(hours=2)
await scheduler.schedule_stream(
start_time=start_time,
duration=duration,
start_callback=lambda: obs.start_streaming(),
stop_callback=lambda: obs.stop_streaming()
)
Chat Bot Integration
from twitchio.ext import commands
class StreamBot(commands.Bot): def init(self, token: str, prefix: str, channels: list): super().init(token=token, prefix=prefix, initial_channels=channels) self.obs = OBSController() self.obs.connect()
async def event_ready(self):
print(f'Bot is ready | {self.nick}')
async def event_message(self, message):
if message.echo:
return
await self.handle_commands(message)
@commands.command(name='scene')
async def scene_command(self, ctx, scene_name: str):
"""Switch OBS scene via chat command."""
if ctx.author.is_mod:
try:
self.obs.switch_scene(scene_name)
await ctx.send(f"Switched to scene: {scene_name}")
except Exception as e:
await ctx.send(f"Error switching scene: {e}")
@commands.command(name='brb')
async def brb_command(self, ctx):
"""Switch to BRB scene."""
if ctx.author.is_mod:
self.obs.switch_scene("BRB")
await ctx.send("Be right back!")
@commands.command(name='back')
async def back_command(self, ctx):
"""Switch back to main scene."""
if ctx.author.is_mod:
self.obs.switch_scene("Main")
await ctx.send("We're back!")
Stream Quality Presets
from dataclasses import dataclass from enum import Enum
class StreamQuality(Enum): LOW = "480p" MEDIUM = "720p" HIGH = "1080p" ULTRA = "1440p"
@dataclass class EncodingPreset: resolution: str video_bitrate: str audio_bitrate: str framerate: int preset: str
QUALITY_PRESETS = { StreamQuality.LOW: EncodingPreset( resolution="854x480", video_bitrate="1500k", audio_bitrate="96k", framerate=30, preset="veryfast" ), StreamQuality.MEDIUM: EncodingPreset( resolution="1280x720", video_bitrate="3000k", audio_bitrate="128k", framerate=30, preset="veryfast" ), StreamQuality.HIGH: EncodingPreset( resolution="1920x1080", video_bitrate="4500k", audio_bitrate="160k", framerate=60, preset="veryfast" ), StreamQuality.ULTRA: EncodingPreset( resolution="2560x1440", video_bitrate="9000k", audio_bitrate="192k", framerate=60, preset="fast" ) }
Health Monitoring
import asyncio from dataclasses import dataclass from datetime import datetime
@dataclass class StreamHealth: bitrate: float dropped_frames: int fps: float cpu_usage: float timestamp: datetime
class StreamMonitor: def init(self, obs: OBSController): self.obs = obs self.health_history: list[StreamHealth] = []
async def monitor(self, interval: float = 5.0):
"""Continuously monitor stream health."""
while True:
try:
stats = self.obs.ws.call(obs_requests.GetStats())
health = StreamHealth(
bitrate=stats.getKbitsPerSec(),
dropped_frames=stats.getOutputSkippedFrames(),
fps=stats.getActiveFps(),
cpu_usage=stats.getCpuUsage(),
timestamp=datetime.now()
)
self.health_history.append(health)
# Alert on issues
if health.dropped_frames > 100:
print(f"Warning: High dropped frames: {health.dropped_frames}")
if health.fps < 25:
print(f"Warning: Low FPS: {health.fps}")
except Exception as e:
print(f"Monitor error: {e}")
await asyncio.sleep(interval)
References
-
YouTube Live Streaming API
-
Twitch API Documentation
-
OBS WebSocket Protocol
-
FFmpeg Streaming Guide