distributed-events-advanced

Distributed Events Advanced Patterns

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "distributed-events-advanced" with this command: npx skills add thapaliyabikendra/ai-artifacts/thapaliyabikendra-ai-artifacts-distributed-events-advanced

Distributed Events Advanced Patterns

Master advanced distributed event patterns for building resilient, scalable ABP microservices architectures.

When to Use This Skill

  • Implementing event handlers that process events from other microservices

  • Building idempotent event processing to handle duplicates

  • Cross-tenant event synchronization

  • Designing event-driven communication patterns

  • Implementing saga/choreography patterns

  • Troubleshooting event delivery issues

Core Concepts

Event Transfer Objects (ETOs)

ETOs are the payload of distributed events. Define them in a shared library accessible by both publisher and subscriber.

// Shared/Etos/PatientCreatedEto.cs [EventName("patient.created")] // Optional: explicit event name public class PatientCreatedEto { public Guid Id { get; set; } public Guid? TenantId { get; set; } public string Name { get; set; } public string Email { get; set; } public DateTime CreatedAt { get; set; }

// Include correlation ID for tracing
public string CorrelationId { get; set; }

}

ETO Best Practices:

  • Include TenantId for multi-tenant scenarios

  • Include CorrelationId for distributed tracing

  • Use primitive types only (no navigation properties)

  • Version ETOs carefully (add properties, don't remove)

Event Handler Patterns

  1. Basic Event Handler

public class PatientCreatedEventHandler : IDistributedEventHandler<PatientCreatedEto>, ITransientDependency { private readonly ILogger<PatientCreatedEventHandler> _logger;

public PatientCreatedEventHandler(
    ILogger&#x3C;PatientCreatedEventHandler> logger)
{
    _logger = logger;
}

public async Task HandleEventAsync(PatientCreatedEto eventData)
{
    _logger.LogInformation(
        "Processing PatientCreated event: {PatientId}",
        eventData.Id);

    // Handle the event
    await ProcessPatientAsync(eventData);
}

}

  1. Idempotent Event Handler

Handle duplicate events gracefully using idempotency keys.

public class PatientSyncEventHandler : IDistributedEventHandler<PatientCreatedEto>, ITransientDependency { private readonly IRepository<Patient, Guid> _repository; private readonly IRepository<ProcessedEvent, Guid> _processedEventRepository; private readonly ILogger<PatientSyncEventHandler> _logger;

public async Task HandleEventAsync(PatientCreatedEto eto)
{
    // Idempotency check using event ID or correlation ID
    var eventKey = $"PatientCreated:{eto.Id}";

    if (await _processedEventRepository.AnyAsync(x => x.EventKey == eventKey))
    {
        _logger.LogInformation(
            "Event already processed, skipping: {EventKey}", eventKey);
        return;
    }

    try
    {
        // Check if entity already exists
        var existing = await _repository.FirstOrDefaultAsync(
            x => x.ExternalId == eto.Id);

        if (existing != null)
        {
            _logger.LogInformation(
                "Patient already exists, updating: {PatientId}", eto.Id);
            existing.SetName(eto.Name);
            await _repository.UpdateAsync(existing);
        }
        else
        {
            var patient = new Patient(
                GuidGenerator.Create(),
                eto.Name,
                eto.Email)
            {
                ExternalId = eto.Id
            };
            await _repository.InsertAsync(patient);
        }

        // Record processed event
        await _processedEventRepository.InsertAsync(new ProcessedEvent
        {
            EventKey = eventKey,
            ProcessedAt = DateTime.UtcNow
        });

        _logger.LogInformation(
            "Successfully processed PatientCreated: {PatientId}", eto.Id);
    }
    catch (Exception ex)
    {
        _logger.LogError(ex,
            "Failed to process PatientCreated: {PatientId}", eto.Id);
        throw; // Re-throw to trigger retry
    }
}

}

// Entity for tracking processed events public class ProcessedEvent : Entity<Guid> { public string EventKey { get; set; } public DateTime ProcessedAt { get; set; } }

  1. Cross-Tenant Event Handler

Handle events that need to operate across tenant boundaries.

public class EntitySyncEventHandler : IDistributedEventHandler<EntityUpdatedEto>, ITransientDependency { private readonly IRepository<Entity, Guid> _repository; private readonly IDataFilter _dataFilter; private readonly ICurrentTenant _currentTenant; private readonly ILogger<EntitySyncEventHandler> _logger;

public async Task HandleEventAsync(EntityUpdatedEto eto)
{
    _logger.LogInformation(
        "Processing cross-tenant sync: {EntityId}, TargetTenant: {TenantId}",
        eto.Id, eto.TenantId);

    // Option 1: Disable tenant filter completely
    using (_dataFilter.Disable&#x3C;IMultiTenant>())
    {
        await SyncEntityAsync(eto);
    }

    // Option 2: Switch to specific tenant context
    using (_currentTenant.Change(eto.TenantId))
    {
        await SyncEntityAsync(eto);
    }
}

private async Task SyncEntityAsync(EntityUpdatedEto eto)
{
    var existing = await _repository.FirstOrDefaultAsync(
        x => x.ExternalId == eto.ExternalId);

    if (existing != null)
    {
        // Update existing
        existing.Update(eto.Name, eto.Value);
        await _repository.UpdateAsync(existing);
    }
    else
    {
        // Create new
        var entity = new Entity(
            GuidGenerator.Create(),
            eto.Name,
            eto.Value)
        {
            TenantId = eto.TenantId,
            ExternalId = eto.ExternalId
        };
        await _repository.InsertAsync(entity);
    }
}

}

  1. Handler with Business Logic Delegation

Separate handler concerns from business logic for testability.

// Event Handler - thin, handles infrastructure concerns public class LicensePlateAllocatedEventHandler : IDistributedEventHandler<LicensePlateAllocatedEto>, ITransientDependency { private readonly ILicensePlateEventService _eventService; private readonly ILogger<LicensePlateAllocatedEventHandler> _logger;

public LicensePlateAllocatedEventHandler(
    ILicensePlateEventService eventService,
    ILogger&#x3C;LicensePlateAllocatedEventHandler> logger)
{
    _eventService = eventService;
    _logger = logger;
}

public async Task HandleEventAsync(LicensePlateAllocatedEto eto)
{
    try
    {
        _logger.LogInformation(
            "[{Handler}] HandleEventAsync - Started - LicensePlateId: {Id}",
            nameof(LicensePlateAllocatedEventHandler), eto.LicensePlateId);

        await _eventService.ProcessAllocationAsync(eto);

        _logger.LogInformation(
            "[{Handler}] HandleEventAsync - Completed - LicensePlateId: {Id}",
            nameof(LicensePlateAllocatedEventHandler), eto.LicensePlateId);
    }
    catch (Exception ex)
    {
        _logger.LogError(ex,
            "[{Handler}] HandleEventAsync - Failed - LicensePlateId: {Id}",
            nameof(LicensePlateAllocatedEventHandler), eto.LicensePlateId);
        throw new UserFriendlyException($"Failed to process allocation: {ex.Message}");
    }
}

}

// Service - contains business logic, easily testable public interface ILicensePlateEventService { Task ProcessAllocationAsync(LicensePlateAllocatedEto eto); }

public class LicensePlateEventService : ApplicationService, ILicensePlateEventService { private readonly IRepository<LicensePlate, Guid> _repository;

public async Task ProcessAllocationAsync(LicensePlateAllocatedEto eto)
{
    var licensePlate = await _repository.GetAsync(eto.LicensePlateId);
    licensePlate.MarkAsAllocated(eto.AllocatedTo, eto.AllocatedAt);
    await _repository.UpdateAsync(licensePlate);
}

}

Publishing Events

  1. From Application Service

public class PatientAppService : ApplicationService, IPatientAppService { private readonly IDistributedEventBus _eventBus;

public async Task&#x3C;PatientDto> CreateAsync(CreatePatientDto input)
{
    var patient = new Patient(GuidGenerator.Create(), input.Name, input.Email);
    await _patientRepository.InsertAsync(patient);

    // Publish event after successful creation
    await _eventBus.PublishAsync(new PatientCreatedEto
    {
        Id = patient.Id,
        TenantId = CurrentTenant.Id,
        Name = patient.Name,
        Email = patient.Email,
        CreatedAt = patient.CreationTime,
        CorrelationId = CorrelationIdAccessor.GetCorrelationId()
    });

    return ObjectMapper.Map&#x3C;Patient, PatientDto>(patient);
}

}

  1. From Domain Entity (Aggregate Root)

public class Patient : FullAuditedAggregateRoot<Guid> { public string Name { get; private set; } public string Email { get; private set; } public bool IsActive { get; private set; }

public void Activate()
{
    if (IsActive)
    {
        throw new BusinessException("Patient is already active");
    }

    IsActive = true;

    // Domain event - published when UoW completes
    AddDistributedEvent(new PatientActivatedEto
    {
        Id = Id,
        Name = Name,
        Email = Email,
        ActivatedAt = DateTime.UtcNow
    });
}

}

  1. Outbox Pattern (Transactional Events)

Ensure events are published atomically with database changes.

// Configure in module public override void ConfigureServices(ServiceConfigurationContext context) { Configure<AbpDistributedEventBusOptions>(options => { options.Outbox.IsEnabled = true; // Enable outbox }); }

Advanced Patterns

  1. Event Retry with Exponential Backoff

public class ResilientEventHandler : IDistributedEventHandler<ImportantEventEto>, ITransientDependency { private const int MaxRetries = 3; private readonly ILogger<ResilientEventHandler> _logger;

public async Task HandleEventAsync(ImportantEventEto eto)
{
    var retryCount = 0;
    Exception lastException = null;

    while (retryCount &#x3C; MaxRetries)
    {
        try
        {
            await ProcessEventAsync(eto);
            return; // Success
        }
        catch (Exception ex) when (IsTransientError(ex))
        {
            lastException = ex;
            retryCount++;

            var delay = TimeSpan.FromSeconds(Math.Pow(2, retryCount));
            _logger.LogWarning(
                "Retry {RetryCount}/{MaxRetries} after {Delay}s for event {EventId}",
                retryCount, MaxRetries, delay.TotalSeconds, eto.Id);

            await Task.Delay(delay);
        }
    }

    _logger.LogError(lastException,
        "Failed to process event after {MaxRetries} retries: {EventId}",
        MaxRetries, eto.Id);
    throw lastException;
}

private bool IsTransientError(Exception ex) =>
    ex is DbUpdateConcurrencyException ||
    ex is TimeoutException ||
    ex is HttpRequestException;

}

  1. Saga/Choreography Pattern

Coordinate multi-step processes across services.

// Step 1: Order Service publishes OrderCreated public class OrderAppService : ApplicationService { public async Task<OrderDto> CreateAsync(CreateOrderDto input) { var order = new Order(GuidGenerator.Create(), input.CustomerId); await _orderRepository.InsertAsync(order);

    await _eventBus.PublishAsync(new OrderCreatedEto
    {
        OrderId = order.Id,
        CustomerId = input.CustomerId,
        Items = input.Items
    });

    return ObjectMapper.Map&#x3C;Order, OrderDto>(order);
}

}

// Step 2: Inventory Service handles OrderCreated public class OrderCreatedHandler : IDistributedEventHandler<OrderCreatedEto> { public async Task HandleEventAsync(OrderCreatedEto eto) { try { await ReserveInventoryAsync(eto.Items);

        // Publish success event
        await _eventBus.PublishAsync(new InventoryReservedEto
        {
            OrderId = eto.OrderId,
            ReservedAt = DateTime.UtcNow
        });
    }
    catch (InsufficientInventoryException ex)
    {
        // Publish failure event for compensation
        await _eventBus.PublishAsync(new InventoryReservationFailedEto
        {
            OrderId = eto.OrderId,
            Reason = ex.Message
        });
    }
}

}

// Step 3: Order Service handles compensation public class InventoryReservationFailedHandler : IDistributedEventHandler<InventoryReservationFailedEto> { public async Task HandleEventAsync(InventoryReservationFailedEto eto) { var order = await _orderRepository.GetAsync(eto.OrderId); order.Cancel($"Inventory reservation failed: {eto.Reason}"); await _orderRepository.UpdateAsync(order);

    // Notify customer
    await _eventBus.PublishAsync(new OrderCancelledEto
    {
        OrderId = eto.OrderId,
        Reason = eto.Reason
    });
}

}

  1. Event Aggregation

Batch multiple events for efficiency.

public class BatchEventHandler : IDistributedEventHandler<ItemUpdatedEto>, ITransientDependency { private static readonly ConcurrentDictionary<Guid, List<ItemUpdatedEto>> _batches = new(); private static readonly SemaphoreSlim _lock = new(1, 1);

public async Task HandleEventAsync(ItemUpdatedEto eto)
{
    var batchKey = eto.TenantId ?? Guid.Empty;

    _batches.AddOrUpdate(
        batchKey,
        new List&#x3C;ItemUpdatedEto> { eto },
        (_, list) => { list.Add(eto); return list; });

    // Process batch when threshold reached
    if (_batches[batchKey].Count >= 100)
    {
        await _lock.WaitAsync();
        try
        {
            if (_batches.TryRemove(batchKey, out var batch))
            {
                await ProcessBatchAsync(batch);
            }
        }
        finally
        {
            _lock.Release();
        }
    }
}

}

Error Handling Patterns

Dead Letter Queue Handling

public class DeadLetterEventHandler : IDistributedEventHandler<DeadLetterEvent>, ITransientDependency { private readonly IRepository<FailedEvent, Guid> _failedEventRepository; private readonly ILogger<DeadLetterEventHandler> _logger;

public async Task HandleEventAsync(DeadLetterEvent eto)
{
    _logger.LogWarning(
        "Event moved to dead letter queue: {EventType}, Error: {Error}",
        eto.OriginalEventType, eto.ErrorMessage);

    await _failedEventRepository.InsertAsync(new FailedEvent
    {
        EventType = eto.OriginalEventType,
        EventData = eto.OriginalEventData,
        ErrorMessage = eto.ErrorMessage,
        FailedAt = DateTime.UtcNow,
        RetryCount = eto.RetryCount
    });

    // Notify operations team
    await _notificationService.SendAlertAsync(
        "Event Processing Failed",
        $"Event {eto.OriginalEventType} failed after {eto.RetryCount} retries");
}

}

Testing Event Handlers

public class PatientCreatedEventHandlerTests : ApplicationTestBase { private readonly PatientCreatedEventHandler _handler; private readonly IRepository<Patient, Guid> _repository;

[Fact]
public async Task HandleEventAsync_CreatesPatient_WhenNotExists()
{
    // Arrange
    var eto = new PatientCreatedEto
    {
        Id = Guid.NewGuid(),
        Name = "John Doe",
        Email = "john@example.com"
    };

    // Act
    await _handler.HandleEventAsync(eto);

    // Assert
    var patient = await _repository.FirstOrDefaultAsync(
        x => x.ExternalId == eto.Id);
    patient.ShouldNotBeNull();
    patient.Name.ShouldBe("John Doe");
}

[Fact]
public async Task HandleEventAsync_UpdatesPatient_WhenExists()
{
    // Arrange
    var existingId = Guid.NewGuid();
    await _repository.InsertAsync(new Patient(
        Guid.NewGuid(), "Old Name", "old@example.com")
    {
        ExternalId = existingId
    });

    var eto = new PatientCreatedEto
    {
        Id = existingId,
        Name = "New Name",
        Email = "new@example.com"
    };

    // Act
    await _handler.HandleEventAsync(eto);

    // Assert
    var patient = await _repository.FirstOrDefaultAsync(
        x => x.ExternalId == existingId);
    patient.Name.ShouldBe("New Name");
}

[Fact]
public async Task HandleEventAsync_IsIdempotent()
{
    // Arrange
    var eto = new PatientCreatedEto
    {
        Id = Guid.NewGuid(),
        Name = "John Doe"
    };

    // Act - process same event twice
    await _handler.HandleEventAsync(eto);
    await _handler.HandleEventAsync(eto);

    // Assert - only one patient created
    var count = await _repository.CountAsync(x => x.ExternalId == eto.Id);
    count.ShouldBe(1);
}

}

Configuration

RabbitMQ Configuration

public override void ConfigureServices(ServiceConfigurationContext context) { var configuration = context.Services.GetConfiguration();

Configure&#x3C;AbpRabbitMqEventBusOptions>(options =>
{
    options.ClientName = "MyService";
    options.ExchangeName = "MyApp";
});

Configure&#x3C;AbpDistributedEventBusOptions>(options =>
{
    options.Outbox.IsEnabled = true;
    options.Inbox.IsEnabled = true;
    options.Inbox.HandlerExecutionMaxRetryCount = 3;
});

}

References

  • Event Handler Templates

  • Saga Pattern Examples

External Resources

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

abp-infrastructure-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

abp-entity-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

abp-api-implementation

No summary provided by upstream source.

Repository SourceNeeds Review
General

abp-service-patterns

No summary provided by upstream source.

Repository SourceNeeds Review