outbox-pattern

Outbox Pattern Implementation

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "outbox-pattern" with this command: npx skills add ronnythedev/dotnet-clean-architecture-skills/ronnythedev-dotnet-clean-architecture-skills-outbox-pattern

Outbox Pattern Implementation

Overview

The Outbox pattern ensures reliable event processing:

  • Atomic persistence - Events saved in same transaction as aggregate

  • Guaranteed delivery - Events processed even if app crashes

  • Eventual consistency - Async processing with retry

  • Idempotency - Handle duplicate processing gracefully

Quick Reference

Component Purpose

OutboxMessage

Persisted event entity

OutboxMessageConfiguration

EF Core mapping

ProcessOutboxMessagesJob

Background processor (Quartz)

IdempotentDomainEventHandler

Deduplicated handler wrapper

OutboxConsumer

Alternative direct DB poller

Outbox Structure

/Infrastructure/ ├── Outbox/ │ ├── OutboxMessage.cs │ ├── OutboxMessageConfiguration.cs │ ├── ProcessOutboxMessagesJob.cs │ ├── ProcessOutboxMessagesJobSetup.cs │ └── IdempotentDomainEventHandler.cs └── ApplicationDbContext.cs

Template: Outbox Message Entity

// src/{name}.infrastructure/Outbox/OutboxMessage.cs namespace {name}.infrastructure.outbox;

/// <summary> /// Represents a domain event stored for reliable delivery /// </summary> public sealed class OutboxMessage { public OutboxMessage() { }

public OutboxMessage(Guid id, string type, string content, DateTime occurredOnUtc)
{
    Id = id;
    Type = type;
    Content = content;
    OccurredOnUtc = occurredOnUtc;
}

/// &#x3C;summary>
/// Unique identifier for this message
/// &#x3C;/summary>
public Guid Id { get; set; }

/// &#x3C;summary>
/// Assembly-qualified type name of the domain event
/// &#x3C;/summary>
public string Type { get; set; } = string.Empty;

/// &#x3C;summary>
/// JSON-serialized event content
/// &#x3C;/summary>
public string Content { get; set; } = string.Empty;

/// &#x3C;summary>
/// When the event originally occurred
/// &#x3C;/summary>
public DateTime OccurredOnUtc { get; set; }

/// &#x3C;summary>
/// When the message was successfully processed (null if not yet processed)
/// &#x3C;/summary>
public DateTime? ProcessedOnUtc { get; set; }

/// &#x3C;summary>
/// Error message if processing failed
/// &#x3C;/summary>
public string? Error { get; set; }

/// &#x3C;summary>
/// Number of processing attempts
/// &#x3C;/summary>
public int RetryCount { get; set; }

}

Template: EF Core Configuration

// src/{name}.infrastructure/Outbox/OutboxMessageConfiguration.cs using Microsoft.EntityFrameworkCore; using Microsoft.EntityFrameworkCore.Metadata.Builders;

namespace {name}.infrastructure.outbox;

internal sealed class OutboxMessageConfiguration : IEntityTypeConfiguration<OutboxMessage> { public void Configure(EntityTypeBuilder<OutboxMessage> builder) { builder.ToTable("outbox_message");

    builder.HasKey(o => o.Id);

    builder.Property(o => o.Id)
        .ValueGeneratedNever();

    builder.Property(o => o.Type)
        .HasMaxLength(500)
        .IsRequired();

    builder.Property(o => o.Content)
        .HasColumnType("jsonb")  // PostgreSQL JSONB
        .IsRequired();

    builder.Property(o => o.OccurredOnUtc)
        .IsRequired();

    builder.Property(o => o.ProcessedOnUtc);

    builder.Property(o => o.Error)
        .HasColumnType("text");

    builder.Property(o => o.RetryCount)
        .HasDefaultValue(0);

    // Index for efficient polling of unprocessed messages
    builder.HasIndex(o => o.ProcessedOnUtc)
        .HasFilter("processed_on_utc IS NULL")
        .HasDatabaseName("ix_outbox_message_unprocessed");

    // Index for cleanup of old processed messages
    builder.HasIndex(o => o.ProcessedOnUtc)
        .HasFilter("processed_on_utc IS NOT NULL")
        .HasDatabaseName("ix_outbox_message_processed");
}

}

Template: DbContext Integration

// src/{name}.infrastructure/ApplicationDbContext.cs using System.Text.Json; using Microsoft.EntityFrameworkCore; using {name}.domain.abstractions; using {name}.infrastructure.outbox;

namespace {name}.infrastructure;

public sealed class ApplicationDbContext : DbContext, IUnitOfWork { private static readonly JsonSerializerOptions JsonOptions = new() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase, WriteIndented = false };

public ApplicationDbContext(DbContextOptions&#x3C;ApplicationDbContext> options)
    : base(options)
{
}

public DbSet&#x3C;OutboxMessage> OutboxMessages => Set&#x3C;OutboxMessage>();

protected override void OnModelCreating(ModelBuilder modelBuilder)
{
    modelBuilder.ApplyConfigurationsFromAssembly(typeof(ApplicationDbContext).Assembly);
    base.OnModelCreating(modelBuilder);
}

public override async Task&#x3C;int> SaveChangesAsync(CancellationToken cancellationToken = default)
{
    // ═══════════════════════════════════════════════════════════════
    // CRITICAL: Add domain events to outbox BEFORE SaveChanges
    // This ensures atomic persistence - events saved in same transaction
    // ═══════════════════════════════════════════════════════════════
    ConvertDomainEventsToOutboxMessages();

    return await base.SaveChangesAsync(cancellationToken);
}

private void ConvertDomainEventsToOutboxMessages()
{
    // Get all entities with domain events
    var entitiesWithEvents = ChangeTracker
        .Entries&#x3C;Entity>()
        .Where(e => e.Entity.GetDomainEvents().Any())
        .Select(e => e.Entity)
        .ToList();

    // Extract all domain events
    var domainEvents = entitiesWithEvents
        .SelectMany(e => e.GetDomainEvents())
        .ToList();

    // Clear events from entities (they're now in outbox)
    foreach (var entity in entitiesWithEvents)
    {
        entity.ClearDomainEvents();
    }

    // Convert to outbox messages
    foreach (var domainEvent in domainEvents)
    {
        var outboxMessage = new OutboxMessage
        {
            Id = Guid.NewGuid(),
            Type = domainEvent.GetType().AssemblyQualifiedName!,
            Content = JsonSerializer.Serialize(
                domainEvent,
                domainEvent.GetType(),
                JsonOptions),
            OccurredOnUtc = DateTime.UtcNow
        };

        OutboxMessages.Add(outboxMessage);
    }
}

}

Template: Outbox Processor Job (Quartz)

// src/{name}.infrastructure/Outbox/ProcessOutboxMessagesJob.cs using System.Text.Json; using MediatR; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; using Quartz; using {name}.domain.abstractions;

namespace {name}.infrastructure.outbox;

/// <summary> /// Background job that processes outbox messages /// Uses Quartz for scheduling with configurable interval /// </summary> [DisallowConcurrentExecution] // Prevent parallel execution public sealed class ProcessOutboxMessagesJob : IJob { private const int BatchSize = 20; private const int MaxRetries = 3;

private static readonly JsonSerializerOptions JsonOptions = new()
{
    PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};

private readonly ApplicationDbContext _dbContext;
private readonly IPublisher _publisher;
private readonly ILogger&#x3C;ProcessOutboxMessagesJob> _logger;

public ProcessOutboxMessagesJob(
    ApplicationDbContext dbContext,
    IPublisher publisher,
    ILogger&#x3C;ProcessOutboxMessagesJob> logger)
{
    _dbContext = dbContext;
    _publisher = publisher;
    _logger = logger;
}

public async Task Execute(IJobExecutionContext context)
{
    _logger.LogDebug("Starting outbox message processing...");

    var messages = await GetUnprocessedMessages(context.CancellationToken);

    if (!messages.Any())
    {
        _logger.LogDebug("No outbox messages to process");
        return;
    }

    _logger.LogInformation(
        "Processing {Count} outbox messages",
        messages.Count);

    foreach (var message in messages)
    {
        await ProcessMessage(message, context.CancellationToken);
    }

    await _dbContext.SaveChangesAsync(context.CancellationToken);

    _logger.LogInformation("Completed outbox message processing");
}

private async Task&#x3C;List&#x3C;OutboxMessage>> GetUnprocessedMessages(
    CancellationToken cancellationToken)
{
    return await _dbContext.OutboxMessages
        .Where(m => m.ProcessedOnUtc == null)
        .Where(m => m.RetryCount &#x3C; MaxRetries)
        .OrderBy(m => m.OccurredOnUtc)
        .Take(BatchSize)
        .ToListAsync(cancellationToken);
}

private async Task ProcessMessage(
    OutboxMessage message,
    CancellationToken cancellationToken)
{
    try
    {
        _logger.LogDebug(
            "Processing outbox message {MessageId} of type {Type}",
            message.Id,
            message.Type);

        // Resolve the event type
        var eventType = Type.GetType(message.Type);

        if (eventType is null)
        {
            _logger.LogError(
                "Could not resolve type {Type} for message {MessageId}",
                message.Type,
                message.Id);

            message.Error = $"Could not resolve type: {message.Type}";
            message.ProcessedOnUtc = DateTime.UtcNow;
            return;
        }

        // Deserialize the event
        var domainEvent = JsonSerializer.Deserialize(
            message.Content,
            eventType,
            JsonOptions) as IDomainEvent;

        if (domainEvent is null)
        {
            _logger.LogError(
                "Could not deserialize message {MessageId}",
                message.Id);

            message.Error = "Could not deserialize message content";
            message.ProcessedOnUtc = DateTime.UtcNow;
            return;
        }

        // Publish to MediatR handlers
        await _publisher.Publish(domainEvent, cancellationToken);

        // Mark as processed
        message.ProcessedOnUtc = DateTime.UtcNow;
        message.Error = null;

        _logger.LogInformation(
            "Successfully processed outbox message {MessageId}",
            message.Id);
    }
    catch (Exception ex)
    {
        _logger.LogError(
            ex,
            "Error processing outbox message {MessageId}. Retry count: {RetryCount}",
            message.Id,
            message.RetryCount);

        message.RetryCount++;
        message.Error = ex.ToString();

        // Mark as processed if max retries exceeded
        if (message.RetryCount >= MaxRetries)
        {
            message.ProcessedOnUtc = DateTime.UtcNow;
            
            _logger.LogError(
                "Outbox message {MessageId} exceeded max retries and has been marked as failed",
                message.Id);
        }
    }
}

}

Template: Job Configuration

// src/{name}.infrastructure/Outbox/ProcessOutboxMessagesJobSetup.cs using Microsoft.Extensions.Options; using Quartz;

namespace {name}.infrastructure.outbox;

internal sealed class ProcessOutboxMessagesJobSetup : IConfigureOptions<QuartzOptions> { public void Configure(QuartzOptions options) { var jobKey = JobKey.Create(nameof(ProcessOutboxMessagesJob));

    options
        .AddJob&#x3C;ProcessOutboxMessagesJob>(jobBuilder =>
            jobBuilder.WithIdentity(jobKey))
        .AddTrigger(triggerBuilder =>
            triggerBuilder
                .ForJob(jobKey)
                .WithSimpleSchedule(schedule =>
                    schedule
                        .WithIntervalInSeconds(10)  // Poll every 10 seconds
                        .RepeatForever()));
}

}

Template: Idempotent Event Handler Wrapper

// src/{name}.infrastructure/Outbox/IdempotentDomainEventHandler.cs using MediatR; using Microsoft.EntityFrameworkCore; using {name}.domain.abstractions;

namespace {name}.infrastructure.outbox;

/// <summary> /// Wrapper that ensures domain events are processed only once /// Uses a separate tracking table to detect duplicates /// </summary> public abstract class IdempotentDomainEventHandler<TEvent> : INotificationHandler<TEvent> where TEvent : IDomainEvent { private readonly ApplicationDbContext _dbContext;

protected IdempotentDomainEventHandler(ApplicationDbContext dbContext)
{
    _dbContext = dbContext;
}

public async Task Handle(TEvent notification, CancellationToken cancellationToken)
{
    var handlerName = GetType().Name;
    var eventId = notification.Id;

    // Check if already processed
    var alreadyProcessed = await _dbContext
        .Set&#x3C;OutboxMessageConsumer>()
        .AnyAsync(
            c => c.EventId == eventId &#x26;&#x26; c.HandlerName == handlerName,
            cancellationToken);

    if (alreadyProcessed)
    {
        return;  // Skip duplicate processing
    }

    // Process the event
    await HandleAsync(notification, cancellationToken);

    // Mark as processed
    _dbContext.Set&#x3C;OutboxMessageConsumer>().Add(new OutboxMessageConsumer
    {
        Id = Guid.NewGuid(),
        EventId = eventId,
        HandlerName = handlerName,
        ProcessedOnUtc = DateTime.UtcNow
    });

    await _dbContext.SaveChangesAsync(cancellationToken);
}

protected abstract Task HandleAsync(TEvent notification, CancellationToken cancellationToken);

}

/// <summary> /// Tracks which handlers have processed which events /// </summary> public sealed class OutboxMessageConsumer { public Guid Id { get; set; } public Guid EventId { get; set; } public string HandlerName { get; set; } = string.Empty; public DateTime ProcessedOnUtc { get; set; } }

Template: Cleanup Job

// src/{name}.infrastructure/Outbox/CleanupOutboxMessagesJob.cs using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Logging; using Quartz;

namespace {name}.infrastructure.outbox;

/// <summary> /// Cleans up old processed outbox messages /// Runs daily to prevent table bloat /// </summary> [DisallowConcurrentExecution] public sealed class CleanupOutboxMessagesJob : IJob { private const int RetentionDays = 7; private const int BatchSize = 1000;

private readonly ApplicationDbContext _dbContext;
private readonly ILogger&#x3C;CleanupOutboxMessagesJob> _logger;

public CleanupOutboxMessagesJob(
    ApplicationDbContext dbContext,
    ILogger&#x3C;CleanupOutboxMessagesJob> logger)
{
    _dbContext = dbContext;
    _logger = logger;
}

public async Task Execute(IJobExecutionContext context)
{
    var cutoffDate = DateTime.UtcNow.AddDays(-RetentionDays);

    _logger.LogInformation(
        "Cleaning up outbox messages processed before {CutoffDate}",
        cutoffDate);

    var totalDeleted = 0;
    int deletedInBatch;

    do
    {
        deletedInBatch = await _dbContext.OutboxMessages
            .Where(m => m.ProcessedOnUtc != null)
            .Where(m => m.ProcessedOnUtc &#x3C; cutoffDate)
            .Take(BatchSize)
            .ExecuteDeleteAsync(context.CancellationToken);

        totalDeleted += deletedInBatch;

    } while (deletedInBatch == BatchSize);

    _logger.LogInformation(
        "Cleaned up {Count} old outbox messages",
        totalDeleted);
}

}

Template: Registration

// src/{name}.infrastructure/DependencyInjection.cs private static void AddBackgroundJobs( IServiceCollection services, IConfiguration configuration) { services.AddQuartz(configure => { // Use persistent job store for production configure.UsePersistentStore(options => { options.UsePostgres(configuration.GetConnectionString("Database")!); options.UseJsonSerializer(); }); });

services.AddQuartzHostedService(options =>
{
    options.WaitForJobsToComplete = true;
});

// Register job configurations
services.ConfigureOptions&#x3C;ProcessOutboxMessagesJobSetup>();
services.ConfigureOptions&#x3C;CleanupOutboxMessagesJobSetup>();

}

Database Migration

-- Create outbox_message table CREATE TABLE outbox_message ( id UUID PRIMARY KEY, type VARCHAR(500) NOT NULL, content JSONB NOT NULL, occurred_on_utc TIMESTAMP NOT NULL, processed_on_utc TIMESTAMP NULL, error TEXT NULL, retry_count INTEGER NOT NULL DEFAULT 0 );

-- Index for unprocessed messages (most important) CREATE INDEX ix_outbox_message_unprocessed ON outbox_message (occurred_on_utc) WHERE processed_on_utc IS NULL;

-- Index for cleanup of old messages CREATE INDEX ix_outbox_message_processed ON outbox_message (processed_on_utc) WHERE processed_on_utc IS NOT NULL;

-- Optional: Consumer tracking table for idempotency CREATE TABLE outbox_message_consumer ( id UUID PRIMARY KEY, event_id UUID NOT NULL, handler_name VARCHAR(500) NOT NULL, processed_on_utc TIMESTAMP NOT NULL );

CREATE UNIQUE INDEX ix_outbox_consumer_event_handler ON outbox_message_consumer (event_id, handler_name);

Critical Rules

  • Same transaction - Events saved with aggregate in one transaction

  • Idempotent handlers - Must handle duplicate delivery

  • Order not guaranteed - Events may process out of order

  • Retry with backoff - Don't hammer failing events

  • Cleanup old messages - Prevent table bloat

  • Monitor failures - Alert on max retries exceeded

  • Type serialization - Use AssemblyQualifiedName for deserialize

  • JSON serialization - Consistent options for serialize/deserialize

  • Batch processing - Don't process one at a time

  • Disable concurrent execution - Prevent duplicate processing

Anti-Patterns to Avoid

// ❌ WRONG: Publishing events directly (not reliable) await _publisher.Publish(new UserCreatedEvent(user.Id)); await _unitOfWork.SaveChangesAsync(); // Event lost if save fails!

// ✅ CORRECT: Events converted to outbox in SaveChanges user.RaiseDomainEvent(new UserCreatedEvent(user.Id)); await _unitOfWork.SaveChangesAsync(); // Events saved atomically

// ❌ WRONG: Non-idempotent handler public async Task Handle(UserCreatedEvent e, CancellationToken ct) { await _emailService.SendWelcomeEmail(e.UserId); // Sent twice on retry! }

// ✅ CORRECT: Idempotent handler public async Task Handle(UserCreatedEvent e, CancellationToken ct) { if (await _emailLog.ExistsAsync(e.UserId, "welcome")) return; // Already sent

await _emailService.SendWelcomeEmail(e.UserId);
await _emailLog.RecordAsync(e.UserId, "welcome");

}

// ❌ WRONG: Processing one message at a time foreach (var message in allMessages) // Could be millions! { await ProcessMessage(message); }

// ✅ CORRECT: Batch with limit var messages = await _dbContext.OutboxMessages .Where(m => m.ProcessedOnUtc == null) .Take(20) // Batch size .ToListAsync();

Related Skills

  • domain-events-generator

  • Domain events that go into outbox

  • quartz-background-jobs

  • Background job scheduling

  • dotnet-clean-architecture

  • Infrastructure layer setup

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

dotnet-clean-architecture

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

unit-testing

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

repository-pattern

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

permission-authorization

No summary provided by upstream source.

Repository SourceNeeds Review