continuous-monitoring

Real-time monitoring and detection of adversarial attacks and model drift in production

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "continuous-monitoring" with this command: npx skills add pluginagentmarketplace/custom-plugin-ai-red-teaming/pluginagentmarketplace-custom-plugin-ai-red-teaming-continuous-monitoring

Continuous Monitoring

Implement real-time detection of adversarial attacks and model degradation in production AI systems.

Quick Reference

Skill:       continuous-monitoring
Agent:       05-defense-strategy-developer
OWASP:       LLM10 (Unbounded Consumption), LLM02 (Sensitive Disclosure)
NIST:        Measure, Manage
Use Case:    Detect attacks and drift in production

Monitoring Architecture

User Input → [Input Monitor] → [Model] → [Output Monitor] → Response
                  ↓                              ↓
            [Anomaly Detection]          [Quality Check]
                  ↓                              ↓
            [Alert System] ←←←←←←←←←←←←←←←←←←←←←←
                  ↓
            [Incident Response]

Detection Categories

1. Input Anomaly Detection

Category: input_anomaly
Latency Impact: 10-20ms
Detection Rate: 85-95%
class InputAnomalyDetector:
    def __init__(self, training_distribution):
        self.mean = training_distribution.mean
        self.cov = training_distribution.covariance
        self.threshold = 3.0  # Standard deviations

    def detect(self, input_embedding):
        # Mahalanobis distance from training distribution
        diff = input_embedding - self.mean
        distance = np.sqrt(diff.T @ np.linalg.inv(self.cov) @ diff)

        if distance > self.threshold:
            return AnomalyAlert(
                type="out_of_distribution",
                score=distance,
                severity=self._classify_severity(distance)
            )
        return None

    def detect_injection(self, text_input):
        # Pattern-based injection detection
        injection_patterns = [
            r'ignore\s+(previous|all)\s+instructions',
            r'system\s*:\s*',
            r'(admin|developer)\s+mode',
        ]
        for pattern in injection_patterns:
            if re.search(pattern, text_input, re.I):
                return AnomalyAlert(type="injection_attempt", severity="HIGH")
        return None

2. Output Quality Monitoring

Category: output_quality
Metrics: [confidence, coherence, toxicity, latency]
class OutputQualityMonitor:
    def __init__(self, config):
        self.confidence_threshold = config.get('confidence', 0.5)
        self.toxicity_threshold = config.get('toxicity', 0.1)
        self.latency_threshold_ms = config.get('latency', 5000)

    def check(self, response, metadata):
        alerts = []

        # Low confidence check
        if metadata.confidence < self.confidence_threshold:
            alerts.append(Alert("low_confidence", metadata.confidence))

        # Toxicity check
        toxicity_score = self.toxicity_classifier(response)
        if toxicity_score > self.toxicity_threshold:
            alerts.append(Alert("high_toxicity", toxicity_score))

        # Latency check
        if metadata.latency_ms > self.latency_threshold_ms:
            alerts.append(Alert("high_latency", metadata.latency_ms))

        # Coherence check
        coherence = self.coherence_scorer(response)
        if coherence < 0.7:
            alerts.append(Alert("low_coherence", coherence))

        return alerts

3. Model Drift Detection

Category: model_drift
Types: [data_drift, concept_drift, prediction_drift]
class DriftDetector:
    def __init__(self, baseline_window=1000):
        self.baseline_window = baseline_window
        self.baseline_inputs = []
        self.baseline_outputs = []

    def detect_data_drift(self, current_inputs):
        """Detect drift in input distribution"""
        if len(self.baseline_inputs) < self.baseline_window:
            self.baseline_inputs.extend(current_inputs)
            return None

        # KL divergence between distributions
        baseline_dist = self._estimate_distribution(self.baseline_inputs)
        current_dist = self._estimate_distribution(current_inputs)
        kl_div = self._kl_divergence(baseline_dist, current_dist)

        if kl_div > 0.1:
            return DriftAlert("data_drift", kl_div)
        return None

    def detect_concept_drift(self, predictions, ground_truth):
        """Detect drift in model performance"""
        # Track accuracy over sliding windows
        recent_accuracy = self._compute_accuracy(predictions, ground_truth)
        baseline_accuracy = self._baseline_accuracy()

        if baseline_accuracy - recent_accuracy > 0.05:
            return DriftAlert("concept_drift", recent_accuracy)
        return None

4. Security Event Monitoring

Category: security_events
Events: [extraction_attempt, jailbreak, rate_abuse]
class SecurityMonitor:
    def __init__(self):
        self.query_history = defaultdict(list)
        self.extraction_patterns = []

    def detect_extraction(self, user_id, queries):
        """Detect model extraction attempts"""
        history = self.query_history[user_id]
        history.extend(queries)

        # Check for systematic querying patterns
        if len(history) > 1000:  # High volume
            diversity = self._query_diversity(history)
            if diversity > 0.9:  # Very diverse
                return SecurityAlert("extraction_attempt", user_id)

        return None

    def detect_abuse(self, user_id, request_timestamps):
        """Detect rate limit abuse"""
        window = 60  # 1 minute
        recent = [t for t in request_timestamps if time.time() - t < window]

        if len(recent) > 100:  # Too many requests
            return SecurityAlert("rate_abuse", user_id, len(recent))
        return None

Alert Configuration

Alert Thresholds:
  input_anomaly:
    warning: 2.5  # standard deviations
    critical: 4.0

  output_toxicity:
    warning: 0.3
    critical: 0.7

  model_drift:
    warning: 0.05  # 5% accuracy drop
    critical: 0.10

  extraction_queries:
    warning: 500/hour
    critical: 1000/hour

Dashboard Metrics

┌──────────────────────────────────────────────────────────┐
│ REAL-TIME MONITORING DASHBOARD                           │
├──────────────────────────────────────────────────────────┤
│ Input Anomalies (1hr):  ████░░░░ 12 (2.4%)              │
│ Output Toxicity (1hr):  █░░░░░░░  3 (0.6%)              │
│ Model Latency P99:      ████████ 2.3s                   │
│ Drift Score:            ██░░░░░░ 0.02 (OK)              │
│ Security Alerts:        ░░░░░░░░ 0                       │
└──────────────────────────────────────────────────────────┘

Troubleshooting

Issue: Too many false positive alerts
Solution: Tune thresholds, add allowlists, improve baseline

Issue: Missing attack detection
Solution: Expand detection patterns, lower thresholds

Issue: High monitoring latency
Solution: Use sampling, async processing, optimize detectors

Integration Points

ComponentPurpose
Agent 05Configures monitoring
Agent 08CI/CD integration
/reportMonitoring reports
Prometheus/GrafanaMetrics visualization

Detect attacks and drift with real-time AI monitoring.

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

prompt-hacking

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

red-team-frameworks

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

llm-jailbreaking

No summary provided by upstream source.

Repository SourceNeeds Review
Automation

safety-filter-bypass

No summary provided by upstream source.

Repository SourceNeeds Review