Skip to content

Commit 0f2098a

Browse files
committed
stream things
1 parent 71da1b5 commit 0f2098a

File tree

5 files changed

+170
-44
lines changed

5 files changed

+170
-44
lines changed

src/SignalR/perf/Microbenchmarks/DefaultHubDispatcherBenchmark.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ public void GlobalSetup()
4747
var contextOptions = new HubConnectionContextOptions()
4848
{
4949
KeepAliveInterval = TimeSpan.Zero,
50+
StreamBufferCapacity = 10,
5051
};
5152
_connectionContext = new NoErrorHubConnectionContext(connection, contextOptions, NullLoggerFactory.Instance);
5253

src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.Log.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ private static class Log
7979
private static readonly Action<ILogger, string, Exception> _invalidHubParameters =
8080
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(22, "InvalidHubParameters"), "Parameters to hub method '{HubMethod}' are incorrect.");
8181

82+
private static readonly Action<ILogger, string, Exception> _invocationIdInUse =
83+
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(23, "InvocationIdInUse"), "Invocation ID '{InvocationId}' is already in use.");
84+
8285
public static void ReceivedHubInvocation(ILogger logger, InvocationMessage invocationMessage)
8386
{
8487
_receivedHubInvocation(logger, invocationMessage, null);
@@ -188,6 +191,11 @@ public static void InvalidHubParameters(ILogger logger, string hubMethod, Except
188191
{
189192
_invalidHubParameters(logger, hubMethod, exception);
190193
}
194+
195+
public static void InvocationIdInUse(ILogger logger, string InvocationId)
196+
{
197+
_invocationIdInUse(logger, InvocationId, null);
198+
}
191199
}
192200
}
193201
}

src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs

Lines changed: 37 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -326,38 +326,7 @@ await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection,
326326

327327
if (isStreamResponse)
328328
{
329-
_ = ExecuteStreamInvocation();
330-
async Task ExecuteStreamInvocation()
331-
{
332-
cts = cts ?? CancellationTokenSource.CreateLinkedTokenSource(connection.ConnectionAborted);
333-
connection.ActiveRequestCancellationSources.TryAdd(hubMethodInvocationMessage.InvocationId, cts);
334-
object result;
335-
336-
try
337-
{
338-
result = await ExecuteHubMethod(methodExecutor, hub, arguments, connection, scope.ServiceProvider);
339-
}
340-
catch (Exception ex)
341-
{
342-
Log.FailedInvokingHubMethod(_logger, hubMethodInvocationMessage.Target, ex);
343-
await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection,
344-
ErrorMessageHelper.BuildErrorMessage($"An unexpected error occurred invoking '{hubMethodInvocationMessage.Target}' on the server.", ex, _enableDetailedErrors));
345-
return;
346-
}
347-
348-
if (result == null)
349-
{
350-
Log.InvalidReturnValueFromStreamingMethod(_logger, methodExecutor.MethodInfo.Name);
351-
await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection,
352-
$"The value returned by the streaming method '{methodExecutor.MethodInfo.Name}' is not a ChannelReader<> or IAsyncEnumerable<>.");
353-
return;
354-
}
355-
356-
var enumerable = descriptor.FromReturnedStream(result, cts.Token);
357-
358-
Log.StreamingResult(_logger, hubMethodInvocationMessage.InvocationId, methodExecutor);
359-
await StreamResultsAsync(hubMethodInvocationMessage.InvocationId, connection, enumerable, scope, hubActivator, hub, cts, hubMethodInvocationMessage);
360-
}
329+
_ = StreamAsync(hubMethodInvocationMessage.InvocationId, connection, arguments, scope, hubActivator, hub, cts, hubMethodInvocationMessage, descriptor);
361330
}
362331
else
363332
{
@@ -447,13 +416,45 @@ private ValueTask CleanupInvocation(HubConnectionContext connection, HubMethodIn
447416
return scope.DisposeAsync();
448417
}
449418

450-
private async Task StreamResultsAsync(string invocationId, HubConnectionContext connection, IAsyncEnumerable<object> enumerable, IServiceScope scope,
451-
IHubActivator<THub> hubActivator, THub hub, CancellationTokenSource streamCts, HubMethodInvocationMessage hubMethodInvocationMessage)
419+
private async Task StreamAsync(string invocationId, HubConnectionContext connection, object[] arguments, IServiceScope scope,
420+
IHubActivator<THub> hubActivator, THub hub, CancellationTokenSource streamCts, HubMethodInvocationMessage hubMethodInvocationMessage, HubMethodDescriptor descriptor)
452421
{
453422
string error = null;
454423

424+
streamCts = streamCts ?? CancellationTokenSource.CreateLinkedTokenSource(connection.ConnectionAborted);
425+
455426
try
456427
{
428+
if (!connection.ActiveRequestCancellationSources.TryAdd(invocationId, streamCts))
429+
{
430+
Log.InvocationIdInUse(_logger, invocationId);
431+
error = $"Invocation ID '{invocationId}' is already in use.";
432+
return;
433+
}
434+
435+
object result;
436+
try
437+
{
438+
result = await ExecuteHubMethod(descriptor.MethodExecutor, hub, arguments, connection, scope.ServiceProvider);
439+
}
440+
catch (Exception ex)
441+
{
442+
Log.FailedInvokingHubMethod(_logger, hubMethodInvocationMessage.Target, ex);
443+
error = ErrorMessageHelper.BuildErrorMessage($"An unexpected error occurred invoking '{hubMethodInvocationMessage.Target}' on the server.", ex, _enableDetailedErrors);
444+
return;
445+
}
446+
447+
if (result == null)
448+
{
449+
Log.InvalidReturnValueFromStreamingMethod(_logger, descriptor.MethodExecutor.MethodInfo.Name);
450+
error = $"The value returned by the streaming method '{descriptor.MethodExecutor.MethodInfo.Name}' is not a ChannelReader<> or IAsyncEnumerable<>.";
451+
return;
452+
}
453+
454+
var enumerable = descriptor.FromReturnedStream(result, streamCts.Token);
455+
456+
Log.StreamingResult(_logger, hubMethodInvocationMessage.InvocationId, descriptor.MethodExecutor);
457+
457458
await foreach (var streamItem in enumerable)
458459
{
459460
// Send the stream item
@@ -468,8 +469,7 @@ private async Task StreamResultsAsync(string invocationId, HubConnectionContext
468469
catch (Exception ex)
469470
{
470471
// If the streaming method was canceled we don't want to send a HubException message - this is not an error case
471-
if (!(ex is OperationCanceledException && connection.ActiveRequestCancellationSources.TryGetValue(invocationId, out var cts)
472-
&& cts.IsCancellationRequested))
472+
if (!(ex is OperationCanceledException && streamCts.IsCancellationRequested))
473473
{
474474
error = ErrorMessageHelper.BuildErrorMessage("An error occurred on the server while streaming results.", ex, _enableDetailedErrors);
475475
}
@@ -478,15 +478,10 @@ private async Task StreamResultsAsync(string invocationId, HubConnectionContext
478478
{
479479
await CleanupInvocation(connection, hubMethodInvocationMessage, hubActivator, hub, scope);
480480

481-
// Dispose the linked CTS for the stream.
482481
streamCts.Dispose();
482+
connection.ActiveRequestCancellationSources.TryRemove(invocationId, out _);
483483

484484
await connection.WriteAsync(CompletionMessage.WithError(invocationId, error));
485-
486-
if (connection.ActiveRequestCancellationSources.TryRemove(invocationId, out var cts))
487-
{
488-
cts.Dispose();
489-
}
490485
}
491486
}
492487

src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -700,13 +700,23 @@ public ChannelReader<string> BlockingStream()
700700
return Channel.CreateUnbounded<string>().Reader;
701701
}
702702

703-
public ChannelReader<int> ThrowStream()
703+
public ChannelReader<int> ExceptionStream()
704704
{
705705
var channel = Channel.CreateUnbounded<int>();
706706
channel.Writer.TryComplete(new Exception("Exception from channel"));
707707
return channel.Reader;
708708
}
709709

710+
public ChannelReader<int> ThrowStream()
711+
{
712+
throw new Exception("Throw from hub method");
713+
}
714+
715+
public ChannelReader<int> NullStream()
716+
{
717+
return null;
718+
}
719+
710720
public int NonStream()
711721
{
712722
return 42;

src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs

Lines changed: 113 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2087,7 +2087,7 @@ public async Task ReceiveCorrectErrorFromStreamThrowing(bool detailedErrors)
20872087

20882088
await client.Connected.OrTimeout();
20892089

2090-
var messages = await client.StreamAsync(nameof(StreamingHub.ThrowStream));
2090+
var messages = await client.StreamAsync(nameof(StreamingHub.ExceptionStream));
20912091

20922092
Assert.Equal(1, messages.Count);
20932093
var completion = messages[0] as CompletionMessage;
@@ -3108,6 +3108,118 @@ public async Task StreamingInvocationsDoNotBlockOtherInvocations()
31083108
}
31093109
}
31103110

3111+
[Fact]
3112+
public async Task StreamMethodThatThrowsWillCleanup()
3113+
{
3114+
bool ExpectedErrors(WriteContext writeContext)
3115+
{
3116+
return writeContext.LoggerName == "Microsoft.AspNetCore.SignalR.Internal.DefaultHubDispatcher" &&
3117+
writeContext.EventId.Name == "FailedInvokingHubMethod";
3118+
}
3119+
3120+
using (StartVerifiableLog(ExpectedErrors))
3121+
{
3122+
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder =>
3123+
{
3124+
builder.AddSingleton(typeof(IHubActivator<>), typeof(CustomHubActivator<>));
3125+
}, LoggerFactory);
3126+
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<StreamingHub>>();
3127+
3128+
using (var client = new TestClient())
3129+
{
3130+
var connectionHandlerTask = await client.ConnectAsync(connectionHandler);
3131+
3132+
await client.Connected.OrTimeout();
3133+
3134+
var messages = await client.StreamAsync(nameof(StreamingHub.ThrowStream));
3135+
3136+
Assert.Equal(1, messages.Count);
3137+
var completion = messages[0] as CompletionMessage;
3138+
Assert.NotNull(completion);
3139+
3140+
var hubActivator = serviceProvider.GetService<IHubActivator<StreamingHub>>() as CustomHubActivator<StreamingHub>;
3141+
3142+
// OnConnectedAsync and ThrowStream hubs have been disposed
3143+
Assert.Equal(2, hubActivator.ReleaseCount);
3144+
3145+
client.Dispose();
3146+
3147+
await connectionHandlerTask.OrTimeout();
3148+
}
3149+
}
3150+
}
3151+
3152+
[Fact]
3153+
public async Task StreamMethodThatReturnsNullWillCleanup()
3154+
{
3155+
using (StartVerifiableLog())
3156+
{
3157+
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder =>
3158+
{
3159+
builder.AddSingleton(typeof(IHubActivator<>), typeof(CustomHubActivator<>));
3160+
}, LoggerFactory);
3161+
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<StreamingHub>>();
3162+
3163+
using (var client = new TestClient())
3164+
{
3165+
var connectionHandlerTask = await client.ConnectAsync(connectionHandler);
3166+
3167+
await client.Connected.OrTimeout();
3168+
3169+
var messages = await client.StreamAsync(nameof(StreamingHub.NullStream));
3170+
3171+
Assert.Equal(1, messages.Count);
3172+
var completion = messages[0] as CompletionMessage;
3173+
Assert.NotNull(completion);
3174+
3175+
var hubActivator = serviceProvider.GetService<IHubActivator<StreamingHub>>() as CustomHubActivator<StreamingHub>;
3176+
3177+
// OnConnectedAsync and NullStream hubs have been disposed
3178+
Assert.Equal(2, hubActivator.ReleaseCount);
3179+
3180+
client.Dispose();
3181+
3182+
await connectionHandlerTask.OrTimeout();
3183+
}
3184+
}
3185+
}
3186+
3187+
[Fact]
3188+
public async Task StreamMethodWithDuplicateIdFails()
3189+
{
3190+
using (StartVerifiableLog())
3191+
{
3192+
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder =>
3193+
{
3194+
builder.AddSingleton(typeof(IHubActivator<>), typeof(CustomHubActivator<>));
3195+
}, LoggerFactory);
3196+
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<StreamingHub>>();
3197+
3198+
using (var client = new TestClient())
3199+
{
3200+
var connectionHandlerTask = await client.ConnectAsync(connectionHandler);
3201+
3202+
await client.Connected.OrTimeout();
3203+
3204+
await client.SendHubMessageAsync(new StreamInvocationMessage("123", nameof(StreamingHub.BlockingStream), Array.Empty<object>())).OrTimeout();
3205+
3206+
await client.SendHubMessageAsync(new StreamInvocationMessage("123", nameof(StreamingHub.BlockingStream), Array.Empty<object>())).OrTimeout();
3207+
3208+
var completion = Assert.IsType<CompletionMessage>(await client.ReadAsync().OrTimeout());
3209+
Assert.Equal("Invocation ID '123' is already in use.", completion.Error);
3210+
3211+
var hubActivator = serviceProvider.GetService<IHubActivator<StreamingHub>>() as CustomHubActivator<StreamingHub>;
3212+
3213+
// OnConnectedAsync and BlockingStream hubs have been disposed
3214+
Assert.Equal(2, hubActivator.ReleaseCount);
3215+
3216+
client.Dispose();
3217+
3218+
await connectionHandlerTask.OrTimeout();
3219+
}
3220+
}
3221+
}
3222+
31113223
[Fact]
31123224
public async Task InvocationsRunInOrderWithNoParallelism()
31133225
{

0 commit comments

Comments
 (0)