diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/EventBusOptions.cs b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/EventBusOptions.cs
index 82b5289..4054890 100644
--- a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/EventBusOptions.cs
+++ b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/EventBusOptions.cs
@@ -8,12 +8,27 @@ namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;
public class EventBusOptions
{
///
- /// The service collection for
+ /// Interval for publish integration event. Defaults to 1000ms.
///
- public IServiceCollection? Services { get; set; }
+ public int Interval { get; set; } = 1000;
///
- /// Interval for publish integration event.
+ /// Maximum number of events that can be sent in one cycle. Pass null to disable limit. Defaults to null.
///
- public int Interval { get; set; } = 1;
+ public int? MaximumBatchSize { get; set; }
+
+ ///
+ /// The maximum number of failure before downgrade. Defaults to 5.
+ ///
+ public int FailureCountBeforeDowngrade { get; set; } = 5;
+
+ ///
+ /// Interval when downgraded. Defaults to 60000ms(1min).
+ ///
+ public int DowngradeInterval { get; set; } = 60 * 1000;
+
+ ///
+ /// The maximum number of success before recover. Defaults to 1.
+ ///
+ public int SuccessCountBeforeRecover { get; set; } = 1;
}
diff --git a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/PublishIntegrationEventHostedService.cs b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/PublishIntegrationEventHostedService.cs
index 5ce0c1a..b760d53 100644
--- a/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/PublishIntegrationEventHostedService.cs
+++ b/src/Cnblogs.Architecture.Ddd.EventBus.Abstractions/PublishIntegrationEventHostedService.cs
@@ -38,53 +38,83 @@ public PublishIntegrationEventHostedService(
///
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
- _logger.LogInformation("Integration event publisher running.");
+ _logger.LogInformation("Integration event publisher running");
var watch = new Stopwatch();
- using var timer = new PeriodicTimer(TimeSpan.FromMicroseconds(_options.Interval));
- while (await timer.WaitForNextTickAsync(stoppingToken))
+ var failureCounter = 0;
+ var successCounter = 0;
+ using var normalTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(_options.Interval));
+ using var failedTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(_options.DowngradeInterval));
+ var currentTimer = normalTimer;
+ var downgraded = false;
+ while (await currentTimer.WaitForNextTickAsync(stoppingToken))
{
try
{
watch.Restart();
- var beforeCount = _eventBuffer.Count;
- await PublishEventAsync();
+ var sent = await PublishEventAsync();
watch.Stop();
var afterCount = _eventBuffer.Count;
- if (afterCount - beforeCount > 0)
+ if (sent > 0)
{
+ successCounter++;
_logger.LogInformation(
"Published {PublishedEventCount} events in {Duration} ms, resting count: {RestingEventCount}",
- beforeCount - afterCount,
+ sent,
watch.ElapsedMilliseconds,
afterCount);
}
}
catch (Exception e)
{
- _logger.LogError(e, "Publish integration event failed, pending count: {Count}", _eventBuffer.Count);
+ failureCounter++;
+ _logger.LogWarning(
+ e,
+ "Publish integration event failed, pending count: {Count}, failure count: {FailureCount}",
+ _eventBuffer.Count,
+ failureCounter);
+ }
+
+ if (downgraded == false && failureCounter >= _options.FailureCountBeforeDowngrade)
+ {
+ _logger.LogError("Integration event publisher downgraded");
+ downgraded = true;
+ currentTimer = failedTimer;
+ successCounter = 0;
+ }
+
+ if (downgraded && successCounter > _options.SuccessCountBeforeRecover)
+ {
+ downgraded = false;
+ currentTimer = normalTimer;
+ failureCounter = 0;
+ _logger.LogWarning("Integration event publisher recovered from downgrade");
}
}
}
- private async Task PublishEventAsync()
+ private async Task PublishEventAsync()
{
if (_eventBuffer.Count == 0)
{
- return;
+ return 0;
}
using var scope = _serviceProvider.CreateScope();
var provider = scope.ServiceProvider.GetRequiredService();
- while (_eventBuffer.Count > 0)
+ var publishedEventCount = 0;
+ while (_eventBuffer.Count > 0 && publishedEventCount != _options.MaximumBatchSize)
{
var buffered = _eventBuffer.Peek();
if (buffered is null)
{
- return;
+ break;
}
await provider.PublishAsync(buffered.Name, buffered.Event);
_eventBuffer.Pop();
+ publishedEventCount++;
}
+
+ return publishedEventCount;
}
}
diff --git a/test/Cnblogs.Architecture.IntegrationTests/IntegrationEventPublishTests.cs b/test/Cnblogs.Architecture.IntegrationTests/IntegrationEventPublishTests.cs
index 979f39a..8cdc7c0 100644
--- a/test/Cnblogs.Architecture.IntegrationTests/IntegrationEventPublishTests.cs
+++ b/test/Cnblogs.Architecture.IntegrationTests/IntegrationEventPublishTests.cs
@@ -42,4 +42,83 @@ public async Task EventBus_PublishEvent_SuccessAsync()
x => x.PublishAsync(It.IsAny(), It.Is(t => t.Message == data)),
Times.Once);
}
+
+ [Fact]
+ public async Task EventBus_Downgrading_DowngradeAsync()
+ {
+ // Arrange
+ const string data = "hello";
+ var builder = new WebApplicationFactory();
+ var eventBusMock = new Mock();
+ builder = builder.WithWebHostBuilder(
+ b => b.ConfigureServices(
+ services =>
+ {
+ services.RemoveAll();
+ services.AddScoped(_ => eventBusMock.Object);
+ services.Configure(
+ o =>
+ {
+ o.FailureCountBeforeDowngrade = 1;
+ o.DowngradeInterval = 3000;
+ });
+ }));
+ eventBusMock.Setup(x => x.PublishAsync(It.IsAny(), It.IsAny()))
+ .ThrowsAsync(new InvalidOperationException());
+
+ // Act
+ var response = await builder.CreateClient().PostAsJsonAsync(
+ "/api/v1/strings",
+ new CreatePayload(false, data));
+ var content = await response.Content.ReadAsStringAsync();
+ await Task.Delay(3000); // hit at 1000ms and 3000ms
+
+ // Assert
+ response.Should().BeSuccessful();
+ content.Should().BeNullOrEmpty();
+ eventBusMock.Verify(
+ x => x.PublishAsync(It.IsAny(), It.Is(t => t.Message == data)),
+ Times.Exactly(2));
+ }
+
+ [Fact]
+ public async Task EventBus_DowngradeThenRecover_RecoverAsync()
+ {
+ // Arrange
+ const string data = "hello";
+ var builder = new WebApplicationFactory();
+ var eventBusMock = new Mock();
+ builder = builder.WithWebHostBuilder(
+ b => b.ConfigureServices(
+ services =>
+ {
+ services.RemoveAll();
+ services.AddScoped(_ => eventBusMock.Object);
+ services.Configure(
+ o =>
+ {
+ o.FailureCountBeforeDowngrade = 1;
+ o.DowngradeInterval = 4000;
+ });
+ }));
+ eventBusMock.Setup(x => x.PublishAsync(It.IsAny(), It.IsAny()))
+ .ThrowsAsync(new InvalidOperationException());
+ await builder.CreateClient().PostAsJsonAsync(
+ "/api/v1/strings",
+ new CreatePayload(false, data));
+ await Task.Delay(1000); // failed, now it is downgraded
+
+ // Act
+ eventBusMock.Reset();
+ await Task.Delay(2000); // recover
+ await builder.CreateClient().PostAsJsonAsync(
+ "/api/v1/strings",
+ new CreatePayload(false, data));
+ await Task.Delay(1000);
+
+ // Assert
+ eventBusMock.Verify(
+ x => x.PublishAsync(It.IsAny(), It.Is(t => t.Message == data)),
+ Times.Exactly(2));
+ }
}