media-expert

Expert guidance for media production, content management systems, video streaming, broadcasting systems, and modern media technology solutions.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "media-expert" with this command: npx skills add personamanagmentlayer/pcl/personamanagmentlayer-pcl-media-expert

Media Expert

Expert guidance for media production, content management systems, video streaming, broadcasting systems, and modern media technology solutions.

Core Concepts

Media Production

  • Video production workflows

  • Audio production and mixing

  • Post-production and editing

  • Visual effects (VFX)

  • Color grading and correction

  • Animation and motion graphics

  • Live production

Streaming and Broadcasting

  • Video streaming platforms

  • Content Delivery Networks (CDN)

  • Adaptive bitrate streaming

  • Live broadcasting

  • OTT (Over-the-Top) platforms

  • Digital rights management (DRM)

  • Transcoding and encoding

Technologies

  • Media Asset Management (MAM)

  • Digital Asset Management (DAM)

  • Broadcast automation

  • IP-based media production

  • Cloud production workflows

  • AI for content analysis

  • Virtual production

Standards and Protocols

  • SMPTE standards

  • HLS (HTTP Live Streaming)

  • DASH (Dynamic Adaptive Streaming over HTTP)

  • RTMP/RTSP protocols

  • NDI (Network Device Interface)

  • MXF (Material Exchange Format)

  • Metadata standards (Dublin Core, IPTC)

Content Management System

from dataclasses import dataclass from datetime import datetime, timedelta from typing import List, Optional, Dict from decimal import Decimal from enum import Enum import hashlib

class MediaType(Enum): VIDEO = "video" AUDIO = "audio" IMAGE = "image" DOCUMENT = "document"

class AssetStatus(Enum): DRAFT = "draft" IN_REVIEW = "in_review" APPROVED = "approved" PUBLISHED = "published" ARCHIVED = "archived"

@dataclass class MediaAsset: """Media asset information""" asset_id: str title: str description: str media_type: MediaType file_path: str file_size_bytes: int duration_seconds: Optional[float] resolution: Optional[str] # e.g., "1920x1080" codec: Optional[str] bitrate_kbps: Optional[int] frame_rate: Optional[float] created_at: datetime created_by: str status: AssetStatus tags: List[str] metadata: Dict[str, str] checksum: str

@dataclass class ContentPackage: """Content package for distribution""" package_id: str title: str assets: List[str] # Asset IDs created_at: datetime scheduled_publish: Optional[datetime] expiration_date: Optional[datetime] distribution_channels: List[str]

class MediaAssetManagementSystem: """Media asset management and workflow"""

def __init__(self):
    self.assets = {}
    self.packages = {}
    self.workflows = []

def ingest_asset(self, file_path: str, metadata: dict) -> MediaAsset:
    """Ingest media asset into system"""
    # Calculate checksum
    checksum = self._calculate_checksum(file_path)

    # Extract technical metadata
    tech_metadata = self._extract_metadata(file_path)

    asset = MediaAsset(
        asset_id=self._generate_asset_id(),
        title=metadata['title'],
        description=metadata.get('description', ''),
        media_type=MediaType(metadata['media_type']),
        file_path=file_path,
        file_size_bytes=tech_metadata['file_size'],
        duration_seconds=tech_metadata.get('duration'),
        resolution=tech_metadata.get('resolution'),
        codec=tech_metadata.get('codec'),
        bitrate_kbps=tech_metadata.get('bitrate'),
        frame_rate=tech_metadata.get('frame_rate'),
        created_at=datetime.now(),
        created_by=metadata['created_by'],
        status=AssetStatus.DRAFT,
        tags=metadata.get('tags', []),
        metadata=metadata.get('custom_metadata', {}),
        checksum=checksum
    )

    self.assets[asset.asset_id] = asset

    # Trigger automated workflows
    self._trigger_workflows(asset)

    return asset

def _extract_metadata(self, file_path: str) -> dict:
    """Extract technical metadata from media file"""
    # Would use ffprobe or similar tool
    # Simulated metadata
    return {
        'file_size': 1073741824,  # 1 GB
        'duration': 3600.0,  # 1 hour
        'resolution': '1920x1080',
        'codec': 'h264',
        'bitrate': 5000,
        'frame_rate': 29.97
    }

def _calculate_checksum(self, file_path: str) -> str:
    """Calculate file checksum for integrity"""
    # In production, would read actual file
    return hashlib.sha256(file_path.encode()).hexdigest()

def transcode_asset(self, asset_id: str, output_profiles: List[dict]) -> dict:
    """Transcode asset to multiple formats"""
    asset = self.assets.get(asset_id)
    if not asset:
        return {'error': 'Asset not found'}

    transcode_jobs = []

    for profile in output_profiles:
        job = {
            'job_id': self._generate_job_id(),
            'asset_id': asset_id,
            'profile_name': profile['name'],
            'target_resolution': profile['resolution'],
            'target_bitrate': profile['bitrate'],
            'target_codec': profile['codec'],
            'status': 'queued',
            'progress_percent': 0,
            'estimated_completion': datetime.now() + timedelta(hours=1)
        }
        transcode_jobs.append(job)

    return {
        'asset_id': asset_id,
        'transcode_jobs': transcode_jobs,
        'total_jobs': len(transcode_jobs)
    }

def search_assets(self, query: dict) -> List[MediaAsset]:
    """Search assets by metadata"""
    results = []

    for asset in self.assets.values():
        match = True

        # Text search
        if 'keywords' in query:
            keywords = query['keywords'].lower()
            if keywords not in asset.title.lower() and keywords not in asset.description.lower():
                match = False

        # Media type filter
        if 'media_type' in query and asset.media_type.value != query['media_type']:
            match = False

        # Status filter
        if 'status' in query and asset.status.value != query['status']:
            match = False

        # Tag filter
        if 'tags' in query:
            required_tags = set(query['tags'])
            asset_tags = set(asset.tags)
            if not required_tags.issubset(asset_tags):
                match = False

        # Date range
        if 'created_after' in query and asset.created_at < query['created_after']:
            match = False

        if match:
            results.append(asset)

    return results

def create_content_package(self, package_data: dict) -> ContentPackage:
    """Create content package for distribution"""
    package = ContentPackage(
        package_id=self._generate_package_id(),
        title=package_data['title'],
        assets=package_data['asset_ids'],
        created_at=datetime.now(),
        scheduled_publish=package_data.get('scheduled_publish'),
        expiration_date=package_data.get('expiration_date'),
        distribution_channels=package_data['channels']
    )

    self.packages[package.package_id] = package

    return package

def analyze_content(self, asset_id: str) -> dict:
    """AI-powered content analysis"""
    asset = self.assets.get(asset_id)
    if not asset:
        return {'error': 'Asset not found'}

    # Simulate AI analysis
    analysis = {
        'asset_id': asset_id,
        'detected_objects': ['person', 'car', 'building'],
        'detected_scenes': ['outdoor', 'daytime', 'urban'],
        'faces_detected': 3,
        'speech_to_text': 'Transcribed content would appear here...',
        'sentiment': 'positive',
        'content_categories': ['news', 'documentary'],
        'suggested_tags': ['urban', 'interview', 'documentary'],
        'quality_score': 85.0
    }

    return analysis

def _trigger_workflows(self, asset: MediaAsset):
    """Trigger automated workflows for asset"""
    # Trigger proxy generation, thumbnails, etc.
    pass

def _generate_asset_id(self) -> str:
    import uuid
    return f"ASSET-{uuid.uuid4().hex[:12].upper()}"

def _generate_package_id(self) -> str:
    import uuid
    return f"PKG-{uuid.uuid4().hex[:8].upper()}"

def _generate_job_id(self) -> str:
    import uuid
    return f"JOB-{uuid.uuid4().hex[:8].upper()}"

Video Streaming Platform

class VideoStreamingPlatform: """Video streaming and delivery system"""

def __init__(self):
    self.streams = {}
    self.viewers = {}
    self.cdn_nodes = {}

def start_live_stream(self, stream_data: dict) -> dict:
    """Start live video stream"""
    stream_id = self._generate_stream_id()

    stream = {
        'stream_id': stream_id,
        'title': stream_data['title'],
        'description': stream_data.get('description', ''),
        'streamer_id': stream_data['streamer_id'],
        'status': 'live',
        'started_at': datetime.now(),
        'viewer_count': 0,
        'peak_viewers': 0,
        'ingest_url': f'rtmp://ingest.example.com/live/{stream_id}',
        'playback_urls': {
            'hls': f'https://cdn.example.com/live/{stream_id}/playlist.m3u8',
            'dash': f'https://cdn.example.com/live/{stream_id}/manifest.mpd'
        },
        'quality_profiles': ['1080p', '720p', '480p', '360p']
    }

    self.streams[stream_id] = stream

    return stream

def generate_adaptive_bitrate_manifest(self, asset_id: str) -> dict:
    """Generate ABR manifest for adaptive streaming"""
    # Generate HLS manifest
    hls_variants = [
        {
            'bandwidth': 5000000,  # 5 Mbps
            'resolution': '1920x1080',
            'codecs': 'avc1.640028,mp4a.40.2',
            'url': f'1080p/playlist.m3u8'
        },
        {
            'bandwidth': 2800000,  # 2.8 Mbps
            'resolution': '1280x720',
            'codecs': 'avc1.64001f,mp4a.40.2',
            'url': f'720p/playlist.m3u8'
        },
        {
            'bandwidth': 1400000,  # 1.4 Mbps
            'resolution': '854x480',
            'codecs': 'avc1.64001e,mp4a.40.2',
            'url': f'480p/playlist.m3u8'
        },
        {
            'bandwidth': 800000,  # 800 Kbps
            'resolution': '640x360',
            'codecs': 'avc1.64001e,mp4a.40.2',
            'url': f'360p/playlist.m3u8'
        }
    ]

    return {
        'asset_id': asset_id,
        'protocol': 'hls',
        'master_playlist_url': f'https://cdn.example.com/vod/{asset_id}/master.m3u8',
        'variants': hls_variants
    }

def track_viewer_metrics(self, stream_id: str, viewer_id: str) -> dict:
    """Track viewer engagement metrics"""
    metrics = {
        'stream_id': stream_id,
        'viewer_id': viewer_id,
        'watch_time_seconds': 3600,
        'buffer_events': 2,
        'average_bitrate': 3500000,
        'quality_switches': 5,
        'playback_start_time_ms': 1200,
        'errors': 0,
        'device_type': 'desktop',
        'browser': 'chrome'
    }

    # Calculate Quality of Experience (QoE)
    qoe_score = self._calculate_qoe(metrics)
    metrics['qoe_score'] = qoe_score

    return metrics

def _calculate_qoe(self, metrics: dict) -> float:
    """Calculate Quality of Experience score"""
    score = 100.0

    # Penalize buffering
    score -= metrics['buffer_events'] * 5

    # Penalize startup time
    if metrics['playback_start_time_ms'] > 2000:
        score -= 10

    # Penalize errors
    score -= metrics['errors'] * 15

    return max(0.0, score)

def implement_drm(self, asset_id: str, drm_config: dict) -> dict:
    """Implement Digital Rights Management"""
    drm = {
        'asset_id': asset_id,
        'drm_systems': {
            'widevine': {
                'license_url': 'https://license.example.com/widevine',
                'supported_levels': ['L1', 'L3']
            },
            'fairplay': {
                'certificate_url': 'https://license.example.com/fairplay/cert',
                'license_url': 'https://license.example.com/fairplay/license'
            },
            'playready': {
                'license_url': 'https://license.example.com/playready'
            }
        },
        'encryption': 'AES-128-CTR',
        'key_rotation_interval': 3600  # seconds
    }

    return drm

def optimize_cdn_delivery(self, asset_id: str, viewer_location: tuple) -> dict:
    """Optimize CDN delivery based on viewer location"""
    # Find nearest CDN edge node
    nearest_node = self._find_nearest_cdn_node(viewer_location)

    return {
        'asset_id': asset_id,
        'cdn_node': nearest_node['node_id'],
        'cdn_location': nearest_node['location'],
        'distance_km': nearest_node['distance'],
        'estimated_latency_ms': nearest_node['latency'],
        'delivery_url': f"https://{nearest_node['node_id']}.cdn.example.com/{asset_id}"
    }

def _find_nearest_cdn_node(self, viewer_location: tuple) -> dict:
    """Find nearest CDN edge node to viewer"""
    # Would calculate actual distances to CDN nodes
    return {
        'node_id': 'edge-us-east-1',
        'location': 'Virginia, USA',
        'distance': 250,  # km
        'latency': 15  # ms
    }

def _generate_stream_id(self) -> str:
    import uuid
    return f"STREAM-{uuid.uuid4().hex[:8].upper()}"

Broadcast Automation

class BroadcastAutomationSystem: """Broadcast scheduling and automation"""

def __init__(self):
    self.schedule = []
    self.playlists = {}

def create_broadcast_schedule(self, channel: str, date: datetime, programming: List[dict]) -> dict:
    """Create daily broadcast schedule"""
    schedule_items = []
    current_time = date.replace(hour=0, minute=0, second=0)

    for program in programming:
        item = {
            'channel': channel,
            'start_time': current_time,
            'end_time': current_time + timedelta(seconds=program['duration']),
            'program_title': program['title'],
            'asset_id': program['asset_id'],
            'type': program['type'],  # 'program', 'commercial', 'filler'
            'metadata': program.get('metadata', {})
        }

        schedule_items.append(item)
        current_time = item['end_time']

    self.schedule.extend(schedule_items)

    return {
        'channel': channel,
        'date': date.date().isoformat(),
        'total_items': len(schedule_items),
        'total_duration': (schedule_items[-1]['end_time'] - schedule_items[0]['start_time']).seconds,
        'schedule': schedule_items[:5]  # Return first 5 items
    }

def generate_playlist(self, schedule_id: str) -> dict:
    """Generate playout playlist"""
    # Convert schedule to playout format
    playlist = {
        'playlist_id': self._generate_playlist_id(),
        'format': 'xml',  # or 'json'
        'items': []
    }

    return playlist

def monitor_broadcast(self, channel: str) -> dict:
    """Monitor live broadcast status"""
    status = {
        'channel': channel,
        'on_air': True,
        'current_program': 'Evening News',
        'time_code': '00:15:32',
        'next_program': 'Sports Tonight',
        'next_program_in': 2728,  # seconds
        'signal_quality': {
            'video_ok': True,
            'audio_ok': True,
            'sync_ok': True
        },
        'alarms': []
    }

    return status

def _generate_playlist_id(self) -> str:
    import uuid
    return f"PLAY-{uuid.uuid4().hex[:8].upper()}"

Best Practices

Media Production

  • Use standardized workflows

  • Implement version control

  • Maintain proper backups

  • Use collaborative tools

  • Implement quality control

  • Document production processes

  • Use industry-standard formats

Content Management

  • Implement robust metadata schema

  • Use consistent naming conventions

  • Enable full-text search

  • Implement access controls

  • Maintain audit trails

  • Use automated workflows

  • Implement archival policies

Streaming Delivery

  • Use adaptive bitrate streaming

  • Implement CDN for global delivery

  • Monitor QoE metrics

  • Optimize for mobile devices

  • Implement DRM when required

  • Use low-latency protocols for live

  • Monitor buffer ratios

Broadcasting

  • Implement redundant systems

  • Automate scheduling

  • Monitor signal quality

  • Maintain emergency protocols

  • Use backup playout systems

  • Implement proper logging

  • Conduct regular testing

Anti-Patterns

❌ No backup systems ❌ Poor metadata management ❌ Single bitrate streaming ❌ No CDN implementation ❌ Ignoring QoE metrics ❌ Manual scheduling processes ❌ No DRM for premium content ❌ Poor asset organization ❌ No disaster recovery plan

Resources

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

finance-expert

No summary provided by upstream source.

Repository SourceNeeds Review
General

trading-expert

No summary provided by upstream source.

Repository SourceNeeds Review
General

dart-expert

No summary provided by upstream source.

Repository SourceNeeds Review
General

postgresql-expert

No summary provided by upstream source.

Repository SourceNeeds Review