Anomaly Detection
Rule-based anomaly detection with cooldowns and error pattern tracking.
When to Use This Skill
-
Detecting slow job degradation before failures
-
Tracking error rate creep over time
-
Identifying repeated error patterns
-
Preventing alert fatigue with cooldowns
Core Concepts
Production systems fail in subtle ways - jobs getting slower, error rates creeping up, same errors repeating. The solution:
-
Configurable rules with severity levels
-
Cooldown periods to prevent alert storms
-
Error pattern tracking for repeated failures
-
Violation decay to reward recovery
Implementation
TypeScript
enum AnomalyType { SLOW_JOB = 'slow_job', HIGH_FAILURE_RATE = 'high_failure_rate', WORKER_UNHEALTHY = 'worker_unhealthy', QUEUE_BACKLOG = 'queue_backlog', TIMEOUT_SPIKE = 'timeout_spike', REPEATED_ERROR = 'repeated_error', MEMORY_SPIKE = 'memory_spike', CPU_SPIKE = 'cpu_spike', }
enum AnomalySeverity { CRITICAL = 'critical', HIGH = 'high', MEDIUM = 'medium', LOW = 'low', }
interface AnomalyAlert { id: string; anomalyType: AnomalyType; severity: AnomalySeverity; workerName: string; jobId?: string; message: string; details: Record<string, unknown>; detectedAt: Date; resolvedAt?: Date; resolution?: string; }
interface RuleContext { workerName: string; status: string; failureRate: number; queueDepth: number; durationMs: number; expectedDurationMs: number; timeoutCount: number; errorRepeatCount: number; errorMessage: string; memoryMb: number; cpuPercent: number; }
interface AnomalyRule { anomalyType: AnomalyType; severity: AnomalySeverity; description: string; checkFn: (ctx: RuleContext) => boolean; messageTemplate: string; cooldownSeconds: number; }
const ANOMALY_RULES: AnomalyRule[] = [ { anomalyType: AnomalyType.SLOW_JOB, severity: AnomalySeverity.MEDIUM, description: 'Job execution time exceeds expected duration', checkFn: (ctx) => ctx.durationMs > ctx.expectedDurationMs * 2, messageTemplate: 'Job took {durationMs}ms, expected {expectedDurationMs}ms', cooldownSeconds: 300, }, { anomalyType: AnomalyType.HIGH_FAILURE_RATE, severity: AnomalySeverity.HIGH, description: 'Worker failure rate exceeds threshold', checkFn: (ctx) => ctx.failureRate > 15, messageTemplate: 'Failure rate {failureRate}% exceeds 15% threshold', cooldownSeconds: 600, }, { anomalyType: AnomalyType.WORKER_UNHEALTHY, severity: AnomalySeverity.CRITICAL, description: 'Worker health status is unhealthy', checkFn: (ctx) => ctx.status === 'unhealthy', messageTemplate: 'Worker {workerName} is unhealthy', cooldownSeconds: 300, }, { anomalyType: AnomalyType.QUEUE_BACKLOG, severity: AnomalySeverity.MEDIUM, description: 'Queue depth exceeds threshold', checkFn: (ctx) => ctx.queueDepth > 50, messageTemplate: 'Queue depth {queueDepth} exceeds threshold', cooldownSeconds: 300, }, { anomalyType: AnomalyType.REPEATED_ERROR, severity: AnomalySeverity.HIGH, description: 'Same error repeated multiple times', checkFn: (ctx) => ctx.errorRepeatCount > 5, messageTemplate: 'Error "{errorMessage}" repeated {errorRepeatCount} times', cooldownSeconds: 900, }, { anomalyType: AnomalyType.MEMORY_SPIKE, severity: AnomalySeverity.HIGH, description: 'Memory usage exceeds threshold', checkFn: (ctx) => ctx.memoryMb > 1024, messageTemplate: 'Memory usage {memoryMb}MB exceeds 1GB threshold', cooldownSeconds: 300, }, ];
class AnomalyDetector { private alerts = new Map<string, AnomalyAlert>(); private cooldowns = new Map<string, Date>(); private errorCounts = new Map<string, Map<string, number>>(); private timeoutCounts = new Map<string, number>(); private alertIdCounter = 0;
checkWorkerHealth( workerName: string, health: { status: string; jobsProcessed: number; jobsFailed: number; queueDepth: number; lastDurationMs: number; expectedDurationMs: number; memoryMb: number; cpuPercent: number; } ): AnomalyAlert[] { const detected: AnomalyAlert[] = [];
const failureRate = health.jobsProcessed > 0
? (health.jobsFailed / health.jobsProcessed) * 100
: 0;
const ctx: RuleContext = {
workerName,
status: health.status,
failureRate,
queueDepth: health.queueDepth,
durationMs: health.lastDurationMs,
expectedDurationMs: health.expectedDurationMs,
timeoutCount: this.timeoutCounts.get(workerName) || 0,
errorRepeatCount: 0,
errorMessage: '',
memoryMb: health.memoryMb,
cpuPercent: health.cpuPercent,
};
for (const rule of ANOMALY_RULES) {
if (this.isOnCooldown(workerName, rule.anomalyType)) continue;
if (rule.checkFn(ctx)) {
const alert = this.createAlert(workerName, rule, ctx);
detected.push(alert);
this.setCooldown(workerName, rule.anomalyType, rule.cooldownSeconds);
}
}
return detected;
}
checkJobExecution( workerName: string, jobId: string, durationMs: number, expectedDurationMs: number, success: boolean, error?: string ): AnomalyAlert[] { const detected: AnomalyAlert[] = [];
if (!success && error) {
this.trackError(workerName, error);
}
// Check slow job
if (durationMs > expectedDurationMs * 2) {
if (!this.isOnCooldown(workerName, AnomalyType.SLOW_JOB)) {
const rule = ANOMALY_RULES[0];
const alert = this.createAlert(workerName, rule, {
durationMs,
expectedDurationMs,
} as RuleContext);
alert.jobId = jobId;
detected.push(alert);
this.setCooldown(workerName, AnomalyType.SLOW_JOB, 300);
}
}
// Check repeated errors
if (error) {
const errorCounts = this.errorCounts.get(workerName);
const count = errorCounts?.get(error.slice(0, 200)) || 0;
if (count > 5 && !this.isOnCooldown(workerName, AnomalyType.REPEATED_ERROR)) {
const rule = ANOMALY_RULES.find(r => r.anomalyType === AnomalyType.REPEATED_ERROR)!;
const alert = this.createAlert(workerName, rule, {
errorMessage: error.slice(0, 100),
errorRepeatCount: count,
} as RuleContext);
detected.push(alert);
this.setCooldown(workerName, AnomalyType.REPEATED_ERROR, 900);
}
}
return detected;
}
resolveAnomaly(alertId: string, resolution: string): boolean { const alert = this.alerts.get(alertId); if (!alert || alert.resolvedAt) return false;
alert.resolvedAt = new Date();
alert.resolution = resolution;
return true;
}
getActiveAnomalies(): AnomalyAlert[] { return Array.from(this.alerts.values()) .filter(a => !a.resolvedAt) .sort((a, b) => { const order = { critical: 0, high: 1, medium: 2, low: 3 }; return order[a.severity] - order[b.severity]; }); }
private trackError(workerName: string, error: string): void { if (!this.errorCounts.has(workerName)) { this.errorCounts.set(workerName, new Map()); } const counts = this.errorCounts.get(workerName)!; const key = error.slice(0, 200); counts.set(key, (counts.get(key) || 0) + 1); }
private createAlert(workerName: string, rule: AnomalyRule, ctx: Partial<RuleContext>): AnomalyAlert {
const id = anomaly_${++this.alertIdCounter}_${Date.now()};
let message = rule.messageTemplate;
for (const [key, value] of Object.entries(ctx)) {
message = message.replace(`{${key}}`, String(value));
}
const alert: AnomalyAlert = {
id,
anomalyType: rule.anomalyType,
severity: rule.severity,
workerName,
message,
details: ctx as Record<string, unknown>,
detectedAt: new Date(),
};
this.alerts.set(id, alert);
return alert;
}
private isOnCooldown(workerName: string, anomalyType: AnomalyType): boolean {
const key = ${workerName}:${anomalyType};
const cooldownEnd = this.cooldowns.get(key);
return cooldownEnd !== undefined && cooldownEnd > new Date();
}
private setCooldown(workerName: string, anomalyType: AnomalyType, seconds: number): void {
const key = ${workerName}:${anomalyType};
this.cooldowns.set(key, new Date(Date.now() + seconds * 1000));
}
}
// Singleton let detector: AnomalyDetector | null = null;
export function getAnomalyDetector(): AnomalyDetector { if (!detector) { detector = new AnomalyDetector(); } return detector; }
Python
from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Dict, List, Optional, Callable from enum import Enum
class AnomalyType(str, Enum): SLOW_JOB = "slow_job" HIGH_FAILURE_RATE = "high_failure_rate" WORKER_UNHEALTHY = "worker_unhealthy" QUEUE_BACKLOG = "queue_backlog" TIMEOUT_SPIKE = "timeout_spike" REPEATED_ERROR = "repeated_error" MEMORY_SPIKE = "memory_spike"
class AnomalySeverity(str, Enum): CRITICAL = "critical" HIGH = "high" MEDIUM = "medium" LOW = "low"
@dataclass class AnomalyAlert: id: str anomaly_type: AnomalyType severity: AnomalySeverity worker_name: str message: str details: Dict detected_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc)) job_id: Optional[str] = None resolved_at: Optional[datetime] = None resolution: Optional[str] = None
@dataclass class RuleContext: worker_name: str status: str = "healthy" failure_rate: float = 0.0 queue_depth: int = 0 duration_ms: float = 0.0 expected_duration_ms: float = 0.0 timeout_count: int = 0 error_repeat_count: int = 0 error_message: str = "" memory_mb: float = 0.0 cpu_percent: float = 0.0
@dataclass class AnomalyRule: anomaly_type: AnomalyType severity: AnomalySeverity description: str check_fn: Callable[[RuleContext], bool] message_template: str cooldown_seconds: int
ANOMALY_RULES: List[AnomalyRule] = [ AnomalyRule( anomaly_type=AnomalyType.SLOW_JOB, severity=AnomalySeverity.MEDIUM, description="Job execution time exceeds expected duration", check_fn=lambda ctx: ctx.duration_ms > ctx.expected_duration_ms * 2, message_template="Job took {duration_ms}ms, expected {expected_duration_ms}ms", cooldown_seconds=300, ), AnomalyRule( anomaly_type=AnomalyType.HIGH_FAILURE_RATE, severity=AnomalySeverity.HIGH, description="Worker failure rate exceeds threshold", check_fn=lambda ctx: ctx.failure_rate > 15, message_template="Failure rate {failure_rate}% exceeds 15% threshold", cooldown_seconds=600, ), AnomalyRule( anomaly_type=AnomalyType.WORKER_UNHEALTHY, severity=AnomalySeverity.CRITICAL, description="Worker health status is unhealthy", check_fn=lambda ctx: ctx.status == "unhealthy", message_template="Worker {worker_name} is unhealthy", cooldown_seconds=300, ), AnomalyRule( anomaly_type=AnomalyType.REPEATED_ERROR, severity=AnomalySeverity.HIGH, description="Same error repeated multiple times", check_fn=lambda ctx: ctx.error_repeat_count > 5, message_template='Error "{error_message}" repeated {error_repeat_count} times', cooldown_seconds=900, ), ]
class AnomalyDetector: def init(self): self._alerts: Dict[str, AnomalyAlert] = {} self._cooldowns: Dict[str, datetime] = {} self._error_counts: Dict[str, Dict[str, int]] = {} self._timeout_counts: Dict[str, int] = {} self._alert_counter = 0
def check_worker_health(
self,
worker_name: str,
status: str,
jobs_processed: int,
jobs_failed: int,
queue_depth: int,
last_duration_ms: float,
expected_duration_ms: float,
memory_mb: float = 0,
cpu_percent: float = 0,
) -> List[AnomalyAlert]:
detected: List[AnomalyAlert] = []
failure_rate = (jobs_failed / jobs_processed * 100) if jobs_processed > 0 else 0
ctx = RuleContext(
worker_name=worker_name,
status=status,
failure_rate=failure_rate,
queue_depth=queue_depth,
duration_ms=last_duration_ms,
expected_duration_ms=expected_duration_ms,
timeout_count=self._timeout_counts.get(worker_name, 0),
memory_mb=memory_mb,
cpu_percent=cpu_percent,
)
for rule in ANOMALY_RULES:
if self._is_on_cooldown(worker_name, rule.anomaly_type):
continue
if rule.check_fn(ctx):
alert = self._create_alert(worker_name, rule, ctx)
detected.append(alert)
self._set_cooldown(worker_name, rule.anomaly_type, rule.cooldown_seconds)
return detected
def check_job_execution(
self,
worker_name: str,
job_id: str,
duration_ms: float,
expected_duration_ms: float,
success: bool,
error: Optional[str] = None,
) -> List[AnomalyAlert]:
detected: List[AnomalyAlert] = []
if not success and error:
self._track_error(worker_name, error)
# Check slow job
if duration_ms > expected_duration_ms * 2:
if not self._is_on_cooldown(worker_name, AnomalyType.SLOW_JOB):
rule = ANOMALY_RULES[0]
ctx = RuleContext(
worker_name=worker_name,
duration_ms=duration_ms,
expected_duration_ms=expected_duration_ms,
)
alert = self._create_alert(worker_name, rule, ctx)
alert.job_id = job_id
detected.append(alert)
self._set_cooldown(worker_name, AnomalyType.SLOW_JOB, 300)
# Check repeated errors
if error:
error_counts = self._error_counts.get(worker_name, {})
count = error_counts.get(error[:200], 0)
if count > 5 and not self._is_on_cooldown(worker_name, AnomalyType.REPEATED_ERROR):
rule = next(r for r in ANOMALY_RULES if r.anomaly_type == AnomalyType.REPEATED_ERROR)
ctx = RuleContext(
worker_name=worker_name,
error_message=error[:100],
error_repeat_count=count,
)
alert = self._create_alert(worker_name, rule, ctx)
detected.append(alert)
self._set_cooldown(worker_name, AnomalyType.REPEATED_ERROR, 900)
return detected
def resolve_anomaly(self, alert_id: str, resolution: str) -> bool:
alert = self._alerts.get(alert_id)
if not alert or alert.resolved_at:
return False
alert.resolved_at = datetime.now(timezone.utc)
alert.resolution = resolution
return True
def get_active_anomalies(self) -> List[AnomalyAlert]:
severity_order = {"critical": 0, "high": 1, "medium": 2, "low": 3}
return sorted(
[a for a in self._alerts.values() if not a.resolved_at],
key=lambda a: severity_order[a.severity.value]
)
def _track_error(self, worker_name: str, error: str) -> None:
if worker_name not in self._error_counts:
self._error_counts[worker_name] = {}
key = error[:200]
self._error_counts[worker_name][key] = self._error_counts[worker_name].get(key, 0) + 1
def _create_alert(self, worker_name: str, rule: AnomalyRule, ctx: RuleContext) -> AnomalyAlert:
self._alert_counter += 1
alert_id = f"anomaly_{self._alert_counter}_{int(datetime.now().timestamp() * 1000)}"
message = rule.message_template
for key, value in ctx.__dict__.items():
message = message.replace(f"{{{key}}}", str(value))
alert = AnomalyAlert(
id=alert_id,
anomaly_type=rule.anomaly_type,
severity=rule.severity,
worker_name=worker_name,
message=message,
details=ctx.__dict__,
)
self._alerts[alert_id] = alert
return alert
def _is_on_cooldown(self, worker_name: str, anomaly_type: AnomalyType) -> bool:
key = f"{worker_name}:{anomaly_type.value}"
cooldown_end = self._cooldowns.get(key)
return cooldown_end is not None and cooldown_end > datetime.now(timezone.utc)
def _set_cooldown(self, worker_name: str, anomaly_type: AnomalyType, seconds: int) -> None:
from datetime import timedelta
key = f"{worker_name}:{anomaly_type.value}"
self._cooldowns[key] = datetime.now(timezone.utc) + timedelta(seconds=seconds)
Singleton
_detector: Optional[AnomalyDetector] = None
def get_anomaly_detector() -> AnomalyDetector: global _detector if _detector is None: _detector = AnomalyDetector() return _detector
Usage Examples
Worker Job Monitoring
const detector = getAnomalyDetector();
async function executeJob(job: Job) { const startTime = Date.now();
try { await processJob(job); const duration = Date.now() - startTime;
const alerts = detector.checkJobExecution(
'data-processor',
job.id,
duration,
30000, // Expected 30s
true
);
for (const alert of alerts) {
await notifyOps(alert);
}
} catch (error) { const duration = Date.now() - startTime;
const alerts = detector.checkJobExecution(
'data-processor',
job.id,
duration,
30000,
false,
error.message
);
for (const alert of alerts) {
await notifyOps(alert);
}
throw error;
} }
Periodic Health Checks
setInterval(async () => { const health = await getWorkerHealth('data-processor'); const alerts = detector.checkWorkerHealth('data-processor', health);
for (const alert of alerts) { if (alert.severity === 'critical') { await pageOnCall(alert); } else { await notifySlack(alert); } } }, 30000);
Best Practices
-
Set cooldowns long enough to prevent alert storms
-
Use severity levels to route alerts appropriately
-
Track error patterns to catch repeated failures
-
Clean up old resolved alerts periodically
-
Tune thresholds based on your baseline metrics
Common Mistakes
-
Cooldowns too short (alert fatigue)
-
No error pattern tracking (miss repeated failures)
-
Same severity for all alerts (everything becomes noise)
-
Not resolving alerts (dashboard becomes useless)
-
Thresholds too sensitive (false positives)
Related Patterns
-
health-checks - Source of health data for anomaly detection
-
logging-observability - Structured logging for alert context
-
graceful-degradation - Response to detected anomalies