Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions src/Cnblogs.Architecture.Ddd.EventBus.Dapr/EndPointExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,7 @@ public static IEndpointConventionBuilder Subscribe<TEvent>(
string appName)
where TEvent : IntegrationEvent
{
var daprOptions = builder.ServiceProvider.GetRequiredService<IOptions<DaprOptions>>().Value;
EnsureDaprSubscribeHandlerMapped(builder, daprOptions);
EnsureEventBusRegistered(builder, daprOptions);

var result = builder
var result = builder.EnsureProvision()
.MapPost(route, (TEvent receivedEvent, IEventBus eventBus) => eventBus.ReceiveAsync(receivedEvent))
.WithTopic(DaprOptions.PubSubName, DaprUtils.GetDaprTopicName<TEvent>(appName));
return result;
Expand All @@ -80,6 +76,8 @@ public static IEndpointConventionBuilder Subscribe<TEvent>(
/// <param name="assemblies"><see cref="Assembly"/></param>
public static void Subscribe(this IEndpointRouteBuilder builder, params Assembly[] assemblies)
{
builder.EnsureProvision();

var method = typeof(EndPointExtensions).GetMethod(
nameof(Subscribe),
new[] { typeof(IEndpointRouteBuilder), typeof(string) })!;
Expand All @@ -100,11 +98,11 @@ public static void Subscribe(this IEndpointRouteBuilder builder, params Assembly
}
}

private static void EnsureEventBusRegistered(IEndpointRouteBuilder builder, DaprOptions daprOptions)
private static DaprOptions EnsureEventBusRegistered(this DaprOptions daprOptions, IEndpointRouteBuilder builder)
{
if (daprOptions.IsEventBusRegistered)
{
return;
return daprOptions;
}

var serviceCheck = builder.ServiceProvider.GetRequiredService<IServiceProviderIsService>();
Expand All @@ -115,13 +113,14 @@ private static void EnsureEventBusRegistered(IEndpointRouteBuilder builder, Dapr
}

daprOptions.IsEventBusRegistered = true;
return daprOptions;
}

private static void EnsureDaprSubscribeHandlerMapped(IEndpointRouteBuilder builder, DaprOptions daprOptions)
private static DaprOptions EnsureDaprSubscribeHandlerMapped(this DaprOptions daprOptions, IEndpointRouteBuilder builder)
{
if (daprOptions.IsDaprSubscribeHandlerMapped)
{
return;
return daprOptions;
}

if (builder is IApplicationBuilder app)
Expand All @@ -131,5 +130,17 @@ private static void EnsureDaprSubscribeHandlerMapped(IEndpointRouteBuilder build

builder.MapSubscribeHandler();
daprOptions.IsDaprSubscribeHandlerMapped = true;
return daprOptions;
}

private static DaprOptions GetDaprOptions(this IEndpointRouteBuilder builder)
=> builder.ServiceProvider.GetRequiredService<IOptions<DaprOptions>>().Value;

private static IEndpointRouteBuilder EnsureProvision(this IEndpointRouteBuilder builder)
{
builder.GetDaprOptions()
.EnsureDaprSubscribeHandlerMapped(builder)
.EnsureEventBusRegistered(builder);
return builder;
}
}
22 changes: 21 additions & 1 deletion test/Cnblogs.Architecture.IntegrationTests/DaprTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,27 @@ public async Task Dapr_SubscribeEndpoint_OkAsync()
// Assert
response.StatusCode.Should().Be(HttpStatusCode.OK);
var responseText = await response.Content.ReadAsStringAsync();
responseText.Should().Contain("pubsub");
responseText.Should().Contain(nameof(TestIntegrationEvent));
}

[Fact]
public async Task Dapr_Subscribe_Without_Any_Assembly_OkAsync()
{
// Arrange
var builder = WebApplication.CreateBuilder();
builder.Services.AddDaprEventBus(nameof(DaprTests));
builder.WebHost.UseTestServer();

var app = builder.Build();
app.Subscribe();
await app.StartAsync();
var httpClient = app.GetTestClient();

// Act
var response = await httpClient.GetAsync("/dapr/subscribe");

// Assert
response.StatusCode.Should().Be(HttpStatusCode.OK);
}

[Fact]
Expand Down