From 452e3b1e32bd002f7beebd3553bdb106b18aa017 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B2=88=E6=98=9F=E7=B9=81?= Date: Thu, 10 Aug 2023 18:18:34 +0800 Subject: [PATCH] feat: add circuit breaker for downgrade --- .../EventBusOptions.cs | 23 +++++- .../PublishIntegrationEventHostedService.cs | 54 ++++++++++--- .../IntegrationEventPublishTests.cs | 79 +++++++++++++++++++ 3 files changed, 140 insertions(+), 16 deletions(-) diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/EventBusOptions.cs b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/EventBusOptions.cs index 82b5289..4054890 100644 --- a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/EventBusOptions.cs +++ b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/EventBusOptions.cs @@ -8,12 +8,27 @@ namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions; public class EventBusOptions { /// - /// The service collection for + /// Interval for publish integration event. Defaults to 1000ms. /// - public IServiceCollection? Services { get; set; } + public int Interval { get; set; } = 1000; /// - /// Interval for publish integration event. + /// Maximum number of events that can be sent in one cycle. Pass null to disable limit. Defaults to null. /// - public int Interval { get; set; } = 1; + public int? MaximumBatchSize { get; set; } + + /// + /// The maximum number of failure before downgrade. Defaults to 5. + /// + public int FailureCountBeforeDowngrade { get; set; } = 5; + + /// + /// Interval when downgraded. Defaults to 60000ms(1min). + /// + public int DowngradeInterval { get; set; } = 60 * 1000; + + /// + /// The maximum number of success before recover. Defaults to 1. + /// + public int SuccessCountBeforeRecover { get; set; } = 1; } diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/PublishIntegrationEventHostedService.cs b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/PublishIntegrationEventHostedService.cs index 5ce0c1a..b760d53 100644 --- a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/PublishIntegrationEventHostedService.cs +++ b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/PublishIntegrationEventHostedService.cs @@ -38,53 +38,83 @@ public PublishIntegrationEventHostedService( /// protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - _logger.LogInformation("Integration event publisher running."); + _logger.LogInformation("Integration event publisher running"); var watch = new Stopwatch(); - using var timer = new PeriodicTimer(TimeSpan.FromMicroseconds(_options.Interval)); - while (await timer.WaitForNextTickAsync(stoppingToken)) + var failureCounter = 0; + var successCounter = 0; + using var normalTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(_options.Interval)); + using var failedTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(_options.DowngradeInterval)); + var currentTimer = normalTimer; + var downgraded = false; + while (await currentTimer.WaitForNextTickAsync(stoppingToken)) { try { watch.Restart(); - var beforeCount = _eventBuffer.Count; - await PublishEventAsync(); + var sent = await PublishEventAsync(); watch.Stop(); var afterCount = _eventBuffer.Count; - if (afterCount - beforeCount > 0) + if (sent > 0) { + successCounter++; _logger.LogInformation( "Published {PublishedEventCount} events in {Duration} ms, resting count: {RestingEventCount}", - beforeCount - afterCount, + sent, watch.ElapsedMilliseconds, afterCount); } } catch (Exception e) { - _logger.LogError(e, "Publish integration event failed, pending count: {Count}", _eventBuffer.Count); + failureCounter++; + _logger.LogWarning( + e, + "Publish integration event failed, pending count: {Count}, failure count: {FailureCount}", + _eventBuffer.Count, + failureCounter); + } + + if (downgraded == false && failureCounter >= _options.FailureCountBeforeDowngrade) + { + _logger.LogError("Integration event publisher downgraded"); + downgraded = true; + currentTimer = failedTimer; + successCounter = 0; + } + + if (downgraded && successCounter > _options.SuccessCountBeforeRecover) + { + downgraded = false; + currentTimer = normalTimer; + failureCounter = 0; + _logger.LogWarning("Integration event publisher recovered from downgrade"); } } } - private async Task PublishEventAsync() + private async Task PublishEventAsync() { if (_eventBuffer.Count == 0) { - return; + return 0; } using var scope = _serviceProvider.CreateScope(); var provider = scope.ServiceProvider.GetRequiredService(); - while (_eventBuffer.Count > 0) + var publishedEventCount = 0; + while (_eventBuffer.Count > 0 && publishedEventCount != _options.MaximumBatchSize) { var buffered = _eventBuffer.Peek(); if (buffered is null) { - return; + break; } await provider.PublishAsync(buffered.Name, buffered.Event); _eventBuffer.Pop(); + publishedEventCount++; } + + return publishedEventCount; } } diff --git a/test/Cnblogs.Architecture.IntegrationTests/IntegrationEventPublishTests.cs b/test/Cnblogs.Architecture.IntegrationTests/IntegrationEventPublishTests.cs index 979f39a..8cdc7c0 100644 --- a/test/Cnblogs.Architecture.IntegrationTests/IntegrationEventPublishTests.cs +++ b/test/Cnblogs.Architecture.IntegrationTests/IntegrationEventPublishTests.cs @@ -42,4 +42,83 @@ public async Task EventBus_PublishEvent_SuccessAsync() x => x.PublishAsync(It.IsAny(), It.Is(t => t.Message == data)), Times.Once); } + + [Fact] + public async Task EventBus_Downgrading_DowngradeAsync() + { + // Arrange + const string data = "hello"; + var builder = new WebApplicationFactory(); + var eventBusMock = new Mock(); + builder = builder.WithWebHostBuilder( + b => b.ConfigureServices( + services => + { + services.RemoveAll(); + services.AddScoped(_ => eventBusMock.Object); + services.Configure( + o => + { + o.FailureCountBeforeDowngrade = 1; + o.DowngradeInterval = 3000; + }); + })); + eventBusMock.Setup(x => x.PublishAsync(It.IsAny(), It.IsAny())) + .ThrowsAsync(new InvalidOperationException()); + + // Act + var response = await builder.CreateClient().PostAsJsonAsync( + "/api/v1/strings", + new CreatePayload(false, data)); + var content = await response.Content.ReadAsStringAsync(); + await Task.Delay(3000); // hit at 1000ms and 3000ms + + // Assert + response.Should().BeSuccessful(); + content.Should().BeNullOrEmpty(); + eventBusMock.Verify( + x => x.PublishAsync(It.IsAny(), It.Is(t => t.Message == data)), + Times.Exactly(2)); + } + + [Fact] + public async Task EventBus_DowngradeThenRecover_RecoverAsync() + { + // Arrange + const string data = "hello"; + var builder = new WebApplicationFactory(); + var eventBusMock = new Mock(); + builder = builder.WithWebHostBuilder( + b => b.ConfigureServices( + services => + { + services.RemoveAll(); + services.AddScoped(_ => eventBusMock.Object); + services.Configure( + o => + { + o.FailureCountBeforeDowngrade = 1; + o.DowngradeInterval = 4000; + }); + })); + eventBusMock.Setup(x => x.PublishAsync(It.IsAny(), It.IsAny())) + .ThrowsAsync(new InvalidOperationException()); + await builder.CreateClient().PostAsJsonAsync( + "/api/v1/strings", + new CreatePayload(false, data)); + await Task.Delay(1000); // failed, now it is downgraded + + // Act + eventBusMock.Reset(); + await Task.Delay(2000); // recover + await builder.CreateClient().PostAsJsonAsync( + "/api/v1/strings", + new CreatePayload(false, data)); + await Task.Delay(1000); + + // Assert + eventBusMock.Verify( + x => x.PublishAsync(It.IsAny(), It.Is(t => t.Message == data)), + Times.Exactly(2)); + } }