Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,27 @@ namespace Cnblogs.Architecture.Ddd.EventBus.Abstractions;
public class EventBusOptions
{
/// <summary>
/// The service collection for
/// Interval for publish integration event. Defaults to 1000ms.
/// </summary>
public IServiceCollection? Services { get; set; }
public int Interval { get; set; } = 1000;

/// <summary>
/// Interval for publish integration event.
/// Maximum number of events that can be sent in one cycle. Pass <c>null</c> to disable limit. Defaults to <c>null</c>.
/// </summary>
public int Interval { get; set; } = 1;
public int? MaximumBatchSize { get; set; }

/// <summary>
/// The maximum number of failure before downgrade. Defaults to 5.
/// </summary>
public int FailureCountBeforeDowngrade { get; set; } = 5;

/// <summary>
/// Interval when downgraded. Defaults to 60000ms(1min).
/// </summary>
public int DowngradeInterval { get; set; } = 60 * 1000;

/// <summary>
/// The maximum number of success before recover. Defaults to 1.
/// </summary>
public int SuccessCountBeforeRecover { get; set; } = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,53 +38,83 @@ public PublishIntegrationEventHostedService(
/// <inheritdoc />
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Integration event publisher running.");
_logger.LogInformation("Integration event publisher running");
var watch = new Stopwatch();
using var timer = new PeriodicTimer(TimeSpan.FromMicroseconds(_options.Interval));
while (await timer.WaitForNextTickAsync(stoppingToken))
var failureCounter = 0;
var successCounter = 0;
using var normalTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(_options.Interval));
using var failedTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(_options.DowngradeInterval));
var currentTimer = normalTimer;
var downgraded = false;
while (await currentTimer.WaitForNextTickAsync(stoppingToken))
{
try
{
watch.Restart();
var beforeCount = _eventBuffer.Count;
await PublishEventAsync();
var sent = await PublishEventAsync();
watch.Stop();
var afterCount = _eventBuffer.Count;
if (afterCount - beforeCount > 0)
if (sent > 0)
{
successCounter++;
_logger.LogInformation(
"Published {PublishedEventCount} events in {Duration} ms, resting count: {RestingEventCount}",
beforeCount - afterCount,
sent,
watch.ElapsedMilliseconds,
afterCount);
}
}
catch (Exception e)
{
_logger.LogError(e, "Publish integration event failed, pending count: {Count}", _eventBuffer.Count);
failureCounter++;
_logger.LogWarning(
e,
"Publish integration event failed, pending count: {Count}, failure count: {FailureCount}",
_eventBuffer.Count,
failureCounter);
}

if (downgraded == false && failureCounter >= _options.FailureCountBeforeDowngrade)
{
_logger.LogError("Integration event publisher downgraded");
downgraded = true;
currentTimer = failedTimer;
successCounter = 0;
}

if (downgraded && successCounter > _options.SuccessCountBeforeRecover)
{
downgraded = false;
currentTimer = normalTimer;
failureCounter = 0;
_logger.LogWarning("Integration event publisher recovered from downgrade");
}
}
}

private async Task PublishEventAsync()
private async Task<int> PublishEventAsync()
{
if (_eventBuffer.Count == 0)
{
return;
return 0;
}

using var scope = _serviceProvider.CreateScope();
var provider = scope.ServiceProvider.GetRequiredService<IEventBusProvider>();
while (_eventBuffer.Count > 0)
var publishedEventCount = 0;
while (_eventBuffer.Count > 0 && publishedEventCount != _options.MaximumBatchSize)
{
var buffered = _eventBuffer.Peek();
if (buffered is null)
{
return;
break;
}

await provider.PublishAsync(buffered.Name, buffered.Event);
_eventBuffer.Pop();
publishedEventCount++;
}

return publishedEventCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,83 @@ public async Task EventBus_PublishEvent_SuccessAsync()
x => x.PublishAsync(It.IsAny<string>(), It.Is<TestIntegrationEvent>(t => t.Message == data)),
Times.Once);
}

[Fact]
public async Task EventBus_Downgrading_DowngradeAsync()
{
// Arrange
const string data = "hello";
var builder = new WebApplicationFactory<Program>();
var eventBusMock = new Mock<IEventBusProvider>();
builder = builder.WithWebHostBuilder(
b => b.ConfigureServices(
services =>
{
services.RemoveAll<IEventBusProvider>();
services.AddScoped<IEventBusProvider>(_ => eventBusMock.Object);
services.Configure<EventBusOptions>(
o =>
{
o.FailureCountBeforeDowngrade = 1;
o.DowngradeInterval = 3000;
});
}));
eventBusMock.Setup(x => x.PublishAsync(It.IsAny<string>(), It.IsAny<IntegrationEvent>()))
.ThrowsAsync(new InvalidOperationException());

// Act
var response = await builder.CreateClient().PostAsJsonAsync(
"/api/v1/strings",
new CreatePayload(false, data));
var content = await response.Content.ReadAsStringAsync();
await Task.Delay(3000); // hit at 1000ms and 3000ms

// Assert
response.Should().BeSuccessful();
content.Should().BeNullOrEmpty();
eventBusMock.Verify(
x => x.PublishAsync(It.IsAny<string>(), It.Is<TestIntegrationEvent>(t => t.Message == data)),
Times.Exactly(2));
}

[Fact]
public async Task EventBus_DowngradeThenRecover_RecoverAsync()
{
// Arrange
const string data = "hello";
var builder = new WebApplicationFactory<Program>();
var eventBusMock = new Mock<IEventBusProvider>();
builder = builder.WithWebHostBuilder(
b => b.ConfigureServices(
services =>
{
services.RemoveAll<IEventBusProvider>();
services.AddScoped<IEventBusProvider>(_ => eventBusMock.Object);
services.Configure<EventBusOptions>(
o =>
{
o.FailureCountBeforeDowngrade = 1;
o.DowngradeInterval = 4000;
});
}));
eventBusMock.Setup(x => x.PublishAsync(It.IsAny<string>(), It.IsAny<IntegrationEvent>()))
.ThrowsAsync(new InvalidOperationException());
await builder.CreateClient().PostAsJsonAsync(
"/api/v1/strings",
new CreatePayload(false, data));
await Task.Delay(1000); // failed, now it is downgraded

// Act
eventBusMock.Reset();
await Task.Delay(2000); // recover
await builder.CreateClient().PostAsJsonAsync(
"/api/v1/strings",
new CreatePayload(false, data));
await Task.Delay(1000);

// Assert
eventBusMock.Verify(
x => x.PublishAsync(It.IsAny<string>(), It.Is<TestIntegrationEvent>(t => t.Message == data)),
Times.Exactly(2));
}
}