From 212c1e75e97cd5c21b94a754bbfd5f221c460fee Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Tue, 23 Jun 2020 15:02:48 -0700 Subject: [PATCH 01/21] stash --- .../server/Core/src/HubConnectionContext.cs | 3 ++ .../server/Core/src/HubConnectionHandler.cs | 2 -- .../Core/src/Internal/DefaultHubDispatcher.cs | 31 +++++++++++++++---- 3 files changed, 28 insertions(+), 8 deletions(-) diff --git a/src/SignalR/server/Core/src/HubConnectionContext.cs b/src/SignalR/server/Core/src/HubConnectionContext.cs index 0e98a6ee08ad..810a18f0897a 100644 --- a/src/SignalR/server/Core/src/HubConnectionContext.cs +++ b/src/SignalR/server/Core/src/HubConnectionContext.cs @@ -93,6 +93,9 @@ internal StreamTracker StreamTracker internal Exception? CloseException { get; private set; } + internal List ActiveHubInvocations { get; } = new List(); + + /// /// Gets a that notifies when the connection is aborted. /// diff --git a/src/SignalR/server/Core/src/HubConnectionHandler.cs b/src/SignalR/server/Core/src/HubConnectionHandler.cs index c5970a96a6de..16ff297cf65e 100644 --- a/src/SignalR/server/Core/src/HubConnectionHandler.cs +++ b/src/SignalR/server/Core/src/HubConnectionHandler.cs @@ -261,7 +261,6 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection) while (protocol.TryParseMessage(ref buffer, binder, out var message)) { messageReceived = true; - connection.StopClientTimeout(); await _dispatcher.DispatchMessageAsync(connection, message); } @@ -289,7 +288,6 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection) if (protocol.TryParseMessage(ref segment, binder, out var message)) { messageReceived = true; - connection.StopClientTimeout(); await _dispatcher.DispatchMessageAsync(connection, message); } diff --git a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs index 889ee5dfc86f..9066d6e90925 100644 --- a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs +++ b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs @@ -144,24 +144,31 @@ public override async Task OnDisconnectedAsync(HubConnectionContext connection, public override Task DispatchMessageAsync(HubConnectionContext connection, HubMessage hubMessage) { + connection.StopClientTimeout(); // Messages are dispatched sequentially and will stop other messages from being processed until they complete. // Streaming methods will run sequentially until they start streaming, then they will fire-and-forget allowing other messages to run. + var tasks = connection.ActiveHubInvocations; + switch (hubMessage) { case InvocationBindingFailureMessage bindingFailureMessage: - return ProcessInvocationBindingFailure(connection, bindingFailureMessage); + tasks.Add(ProcessInvocationBindingFailure(connection, bindingFailureMessage)); + break; case StreamBindingFailureMessage bindingFailureMessage: - return ProcessStreamBindingFailure(connection, bindingFailureMessage); + tasks.Add(ProcessStreamBindingFailure(connection, bindingFailureMessage)); + break; case InvocationMessage invocationMessage: Log.ReceivedHubInvocation(_logger, invocationMessage); - return ProcessInvocation(connection, invocationMessage, isStreamResponse: false); + tasks.Add(ProcessInvocation(connection, invocationMessage, isStreamResponse: false)); + break; case StreamInvocationMessage streamInvocationMessage: Log.ReceivedStreamHubInvocation(_logger, streamInvocationMessage); - return ProcessInvocation(connection, streamInvocationMessage, isStreamResponse: true); + tasks.Add(ProcessInvocation(connection, streamInvocationMessage, isStreamResponse: true)); + break; case CancelInvocationMessage cancelInvocationMessage: // Check if there is an associated active stream and cancel it if it exists. @@ -183,7 +190,8 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe break; case StreamItemMessage streamItem: - return ProcessStreamItem(connection, streamItem); + tasks.Add(ProcessStreamItem(connection, streamItem)); + break; case CompletionMessage streamCompleteMessage: // closes channels, removes from Lookup dict @@ -204,6 +212,18 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe throw new NotSupportedException($"Received unsupported message: {hubMessage}"); } + var maxParallelOption = 2; + if (tasks.Count == maxParallelOption) + { + return WaitForSpace(tasks); + } + + static async Task WaitForSpace(List tasks) + { + await Task.WhenAny(tasks); + tasks.RemoveAll(t => t.IsCompleted); + } + return Task.CompletedTask; } @@ -229,7 +249,6 @@ private Task ProcessStreamBindingFailure(HubConnectionContext connection, Stream connection.StreamTracker.TryComplete(message); // TODO: Send stream completion message to client when we add it - return Task.CompletedTask; } From 6e89cd137ff13f4f9fc67afae96623df570abaf6 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Mon, 29 Jun 2020 17:17:48 -0700 Subject: [PATCH 02/21] stash --- .../server/Core/src/HubConnectionContext.cs | 4 ++ .../Core/src/HubConnectionContextOptions.cs | 2 + .../server/Core/src/HubConnectionHandler.cs | 5 +- src/SignalR/server/Core/src/HubOptions.cs | 6 ++ .../server/Core/src/HubOptionsSetup`T.cs | 1 + .../Core/src/Internal/DefaultHubDispatcher.cs | 62 +++++++++++-------- .../SignalR/test/HubConnectionHandlerTests.cs | 53 +++++++++++++++- 7 files changed, 104 insertions(+), 29 deletions(-) diff --git a/src/SignalR/server/Core/src/HubConnectionContext.cs b/src/SignalR/server/Core/src/HubConnectionContext.cs index 810a18f0897a..0307645b16a7 100644 --- a/src/SignalR/server/Core/src/HubConnectionContext.cs +++ b/src/SignalR/server/Core/src/HubConnectionContext.cs @@ -73,6 +73,9 @@ public HubConnectionContext(ConnectionContext connectionContext, HubConnectionCo _systemClock = contextOptions.SystemClock ?? new SystemClock(); _lastSendTimeStamp = _systemClock.UtcNowTicks; + + var maxInvokes = contextOptions.MaximumParallelInvocations; + ActiveInvocationLimit = new SemaphoreSlim(maxInvokes - 1, maxInvokes); } internal StreamTracker StreamTracker @@ -95,6 +98,7 @@ internal StreamTracker StreamTracker internal List ActiveHubInvocations { get; } = new List(); + internal SemaphoreSlim ActiveInvocationLimit { get; } /// /// Gets a that notifies when the connection is aborted. diff --git a/src/SignalR/server/Core/src/HubConnectionContextOptions.cs b/src/SignalR/server/Core/src/HubConnectionContextOptions.cs index 4626d195cce4..2ed772ae6057 100644 --- a/src/SignalR/server/Core/src/HubConnectionContextOptions.cs +++ b/src/SignalR/server/Core/src/HubConnectionContextOptions.cs @@ -32,5 +32,7 @@ public class HubConnectionContextOptions public long? MaximumReceiveMessageSize { get; set; } internal ISystemClock SystemClock { get; set; } = default!; + + public int MaximumParallelInvocations { get; set; } } } diff --git a/src/SignalR/server/Core/src/HubConnectionHandler.cs b/src/SignalR/server/Core/src/HubConnectionHandler.cs index 16ff297cf65e..bffe776d89fd 100644 --- a/src/SignalR/server/Core/src/HubConnectionHandler.cs +++ b/src/SignalR/server/Core/src/HubConnectionHandler.cs @@ -31,6 +31,7 @@ public class HubConnectionHandler : ConnectionHandler where THub : Hub private readonly HubDispatcher _dispatcher; private readonly bool _enableDetailedErrors; private readonly long? _maximumMessageSize; + private readonly int _maxParallelInvokes; // Internal for testing internal ISystemClock SystemClock { get; set; } = new SystemClock(); @@ -70,6 +71,7 @@ IServiceScopeFactory serviceScopeFactory { _maximumMessageSize = _hubOptions.MaximumReceiveMessageSize; _enableDetailedErrors = _hubOptions.EnableDetailedErrors ?? _enableDetailedErrors; + _maxParallelInvokes = _hubOptions.MaxParallelInvocationsPerClient; if (_hubOptions.HubFilters != null) { @@ -81,6 +83,7 @@ IServiceScopeFactory serviceScopeFactory { _maximumMessageSize = _globalHubOptions.MaximumReceiveMessageSize; _enableDetailedErrors = _globalHubOptions.EnableDetailedErrors ?? _enableDetailedErrors; + _maxParallelInvokes = _globalHubOptions.MaxParallelInvocationsPerClient; if (_globalHubOptions.HubFilters != null) { @@ -118,6 +121,7 @@ public override async Task OnConnectedAsync(ConnectionContext connection) StreamBufferCapacity = _hubOptions.StreamBufferCapacity ?? _globalHubOptions.StreamBufferCapacity ?? HubOptionsSetup.DefaultStreamBufferCapacity, MaximumReceiveMessageSize = _maximumMessageSize, SystemClock = SystemClock, + MaximumParallelInvocations = _maxParallelInvokes, }; Log.ConnectedStarting(_logger); @@ -237,7 +241,6 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection) var protocol = connection.Protocol; connection.BeginClientTimeout(); - var binder = new HubConnectionBinder(_dispatcher, connection); while (true) diff --git a/src/SignalR/server/Core/src/HubOptions.cs b/src/SignalR/server/Core/src/HubOptions.cs index a9a889909de1..bd571181bd42 100644 --- a/src/SignalR/server/Core/src/HubOptions.cs +++ b/src/SignalR/server/Core/src/HubOptions.cs @@ -53,5 +53,11 @@ public class HubOptions public int? StreamBufferCapacity { get; set; } = null; internal List? HubFilters { get; set; } + + /// + /// By default a client is only allowed to invoke a single Hub method at a time. + /// Changing this property will allow clients to invoke multiple invocations at the same time before queueing. + /// + public int MaxParallelInvocationsPerClient { get; set; } = 2; } } diff --git a/src/SignalR/server/Core/src/HubOptionsSetup`T.cs b/src/SignalR/server/Core/src/HubOptionsSetup`T.cs index a935980e0936..dfcdca74d676 100644 --- a/src/SignalR/server/Core/src/HubOptionsSetup`T.cs +++ b/src/SignalR/server/Core/src/HubOptionsSetup`T.cs @@ -25,6 +25,7 @@ public void Configure(HubOptions options) options.EnableDetailedErrors = _hubOptions.EnableDetailedErrors; options.MaximumReceiveMessageSize = _hubOptions.MaximumReceiveMessageSize; options.StreamBufferCapacity = _hubOptions.StreamBufferCapacity; + options.MaxParallelInvocationsPerClient = _hubOptions.MaxParallelInvocationsPerClient; options.UserHasSetValues = true; diff --git a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs index 9066d6e90925..94a1ddf828a7 100644 --- a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs +++ b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs @@ -145,29 +145,28 @@ public override async Task OnDisconnectedAsync(HubConnectionContext connection, public override Task DispatchMessageAsync(HubConnectionContext connection, HubMessage hubMessage) { connection.StopClientTimeout(); + // Messages are dispatched sequentially and will stop other messages from being processed until they complete. // Streaming methods will run sequentially until they start streaming, then they will fire-and-forget allowing other messages to run. - var tasks = connection.ActiveHubInvocations; - switch (hubMessage) { case InvocationBindingFailureMessage bindingFailureMessage: - tasks.Add(ProcessInvocationBindingFailure(connection, bindingFailureMessage)); + _ = ProcessInvocationBindingFailure(connection, bindingFailureMessage); break; case StreamBindingFailureMessage bindingFailureMessage: - tasks.Add(ProcessStreamBindingFailure(connection, bindingFailureMessage)); + _ = ProcessStreamBindingFailure(connection, bindingFailureMessage); break; case InvocationMessage invocationMessage: Log.ReceivedHubInvocation(_logger, invocationMessage); - tasks.Add(ProcessInvocation(connection, invocationMessage, isStreamResponse: false)); + _ = ProcessInvocation(connection, invocationMessage, isStreamResponse: false); break; case StreamInvocationMessage streamInvocationMessage: Log.ReceivedStreamHubInvocation(_logger, streamInvocationMessage); - tasks.Add(ProcessInvocation(connection, streamInvocationMessage, isStreamResponse: true)); + _ = ProcessInvocation(connection, streamInvocationMessage, isStreamResponse: true); break; case CancelInvocationMessage cancelInvocationMessage: @@ -183,14 +182,14 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe // Stream can be canceled on the server while client is canceling stream. Log.UnexpectedCancel(_logger); } - break; + return Task.CompletedTask; case PingMessage _: connection.StartClientTimeout(); - break; + return Task.CompletedTask; case StreamItemMessage streamItem: - tasks.Add(ProcessStreamItem(connection, streamItem)); + _ = ProcessStreamItem(connection, streamItem); break; case CompletionMessage streamCompleteMessage: @@ -204,7 +203,7 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe { Log.UnexpectedStreamCompletion(_logger); } - break; + return Task.CompletedTask; // Other kind of message we weren't expecting default: @@ -212,19 +211,7 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe throw new NotSupportedException($"Received unsupported message: {hubMessage}"); } - var maxParallelOption = 2; - if (tasks.Count == maxParallelOption) - { - return WaitForSpace(tasks); - } - - static async Task WaitForSpace(List tasks) - { - await Task.WhenAny(tasks); - tasks.RemoveAll(t => t.IsCompleted); - } - - return Task.CompletedTask; + return connection.ActiveInvocationLimit.WaitAsync(); } private Task ProcessInvocationBindingFailure(HubConnectionContext connection, InvocationBindingFailureMessage bindingFailureMessage) @@ -233,7 +220,9 @@ private Task ProcessInvocationBindingFailure(HubConnectionContext connection, In var errorMessage = ErrorMessageHelper.BuildErrorMessage($"Failed to invoke '{bindingFailureMessage.Target}' due to an error on the server.", bindingFailureMessage.BindingFailure.SourceException, _enableDetailedErrors); - return SendInvocationError(bindingFailureMessage.InvocationId, connection, errorMessage); + var t = SendInvocationError(bindingFailureMessage.InvocationId, connection, errorMessage); + connection.ActiveInvocationLimit.Release(); + return t; } private Task ProcessStreamBindingFailure(HubConnectionContext connection, StreamBindingFailureMessage bindingFailureMessage) @@ -248,6 +237,8 @@ private Task ProcessStreamBindingFailure(HubConnectionContext connection, Stream // ignore failure, it means the client already completed the stream or the stream never existed on the server connection.StreamTracker.TryComplete(message); + connection.ActiveInvocationLimit.Release(); + // TODO: Send stream completion message to client when we add it return Task.CompletedTask; } @@ -257,11 +248,24 @@ private Task ProcessStreamItem(HubConnectionContext connection, StreamItemMessag if (!connection.StreamTracker.TryProcessItem(message, out var processTask)) { Log.UnexpectedStreamItem(_logger); + connection.ActiveInvocationLimit.Release(); return Task.CompletedTask; } Log.ReceivedStreamItem(_logger, message); - return processTask; + return ProcessTask(connection, processTask); + + static async Task ProcessTask(HubConnectionContext connection, Task task) + { + try + { + await task; + } + finally + { + connection.ActiveInvocationLimit.Release(); + } + } } private Task ProcessInvocation(HubConnectionContext connection, @@ -271,8 +275,12 @@ private Task ProcessInvocation(HubConnectionContext connection, { // Send an error to the client. Then let the normal completion process occur Log.UnknownHubMethod(_logger, hubMethodInvocationMessage.Target); - return connection.WriteAsync(CompletionMessage.WithError( + var t = connection.WriteAsync(CompletionMessage.WithError( hubMethodInvocationMessage.InvocationId, $"Unknown hub method '{hubMethodInvocationMessage.Target}'")).AsTask(); + + connection.ActiveInvocationLimit.Release(); + + return t; } else { @@ -456,6 +464,8 @@ await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection, { await CleanupInvocation(connection, hubMethodInvocationMessage, hubActivator, hub, scope); } + + connection.ActiveInvocationLimit.Release(); } } diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs index 32d198d2fd01..d57f5c17d657 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs @@ -2923,7 +2923,10 @@ public async Task HubMethodInvokeDoesNotCountTowardsClientTimeout() var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(services => { services.Configure(options => - options.ClientTimeoutInterval = TimeSpan.FromMilliseconds(0)); + { + options.ClientTimeoutInterval = TimeSpan.FromMilliseconds(0); + options.MaxParallelInvocationsPerClient = 1; + }); services.AddSingleton(tcsService); }, LoggerFactory); var connectionHandler = serviceProvider.GetService>(); @@ -2963,6 +2966,42 @@ public async Task HubMethodInvokeDoesNotCountTowardsClientTimeout() } } + [Fact] + public async Task HubMethodInvokeCountsTowardsClientTimeoutIfParallelNotMaxed() + { + using (StartVerifiableLog()) + { + var tcsService = new TcsService(); + var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(services => + { + services.Configure(options => + { + options.ClientTimeoutInterval = TimeSpan.FromMilliseconds(0); + options.MaxParallelInvocationsPerClient = 2; + }); + services.AddSingleton(tcsService); + }, LoggerFactory); + var connectionHandler = serviceProvider.GetService>(); + + using (var client = new TestClient(new JsonHubProtocol())) + { + var connectionHandlerTask = await client.ConnectAsync(connectionHandler); + // This starts the timeout logic + await client.SendHubMessageAsync(PingMessage.Instance); + + // Call long running hub method + var hubMethodTask = client.InvokeAsync(nameof(LongRunningHub.LongRunningMethod)); + await tcsService.StartedMethod.Task.OrTimeout(); + + // Tick heartbeat while hub method is running + client.TickHeartbeat(); + + // Connection is closed + await connectionHandlerTask.OrTimeout(); + } + } + } + [Fact] public async Task EndingConnectionSendsCloseMessageWithNoError() { @@ -3062,7 +3101,7 @@ public async Task StreamingInvocationsDoNotBlockOtherInvocations() } [Fact] - public async Task InvocationsRunInOrder() + public async Task InvocationsRunInOrderWithNoParallelism() { using (StartVerifiableLog()) { @@ -3070,6 +3109,11 @@ public async Task InvocationsRunInOrder() var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder => { builder.AddSingleton(tcsService); + + builder.AddSignalR(options => + { + options.MaxParallelInvocationsPerClient = 1; + }); }, LoggerFactory); var connectionHandler = serviceProvider.GetService>(); @@ -3121,6 +3165,11 @@ public async Task StreamInvocationsBlockOtherInvocationsUntilTheyStartStreaming( { builder.AddSingleton(tcsService); builder.AddSingleton(typeof(IHubActivator<>), typeof(CustomHubActivator<>)); + + builder.AddSignalR(options => + { + options.MaxParallelInvocationsPerClient = 1; + }); }, LoggerFactory); var connectionHandler = serviceProvider.GetService>(); From 27c636f1c74796778edab8b1bb7d1341b0cbaa51 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Tue, 30 Jun 2020 12:59:21 -0700 Subject: [PATCH 03/21] cleanup --- .../server/Core/src/HubConnectionContext.cs | 3 - .../Core/src/HubConnectionContextOptions.cs | 2 +- src/SignalR/server/Core/src/HubOptions.cs | 2 +- .../Core/src/Internal/DefaultHubDispatcher.cs | 51 +++++++---------- .../SignalR/test/HubConnectionHandlerTests.cs | 56 +++++++++++++++++++ 5 files changed, 78 insertions(+), 36 deletions(-) diff --git a/src/SignalR/server/Core/src/HubConnectionContext.cs b/src/SignalR/server/Core/src/HubConnectionContext.cs index 0307645b16a7..2f63512f6852 100644 --- a/src/SignalR/server/Core/src/HubConnectionContext.cs +++ b/src/SignalR/server/Core/src/HubConnectionContext.cs @@ -95,9 +95,6 @@ internal StreamTracker StreamTracker internal HubCallerContext HubCallerContext { get; } internal Exception? CloseException { get; private set; } - - internal List ActiveHubInvocations { get; } = new List(); - internal SemaphoreSlim ActiveInvocationLimit { get; } /// diff --git a/src/SignalR/server/Core/src/HubConnectionContextOptions.cs b/src/SignalR/server/Core/src/HubConnectionContextOptions.cs index 2ed772ae6057..5283ec6fe64c 100644 --- a/src/SignalR/server/Core/src/HubConnectionContextOptions.cs +++ b/src/SignalR/server/Core/src/HubConnectionContextOptions.cs @@ -33,6 +33,6 @@ public class HubConnectionContextOptions internal ISystemClock SystemClock { get; set; } = default!; - public int MaximumParallelInvocations { get; set; } + public int MaximumParallelInvocations { get; set; } = 1; } } diff --git a/src/SignalR/server/Core/src/HubOptions.cs b/src/SignalR/server/Core/src/HubOptions.cs index bd571181bd42..c704bdd78ed8 100644 --- a/src/SignalR/server/Core/src/HubOptions.cs +++ b/src/SignalR/server/Core/src/HubOptions.cs @@ -58,6 +58,6 @@ public class HubOptions /// By default a client is only allowed to invoke a single Hub method at a time. /// Changing this property will allow clients to invoke multiple invocations at the same time before queueing. /// - public int MaxParallelInvocationsPerClient { get; set; } = 2; + public int MaxParallelInvocationsPerClient { get; set; } = 1; } } diff --git a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs index 94a1ddf828a7..5a898fa747c0 100644 --- a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs +++ b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs @@ -152,21 +152,21 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe switch (hubMessage) { case InvocationBindingFailureMessage bindingFailureMessage: - _ = ProcessInvocationBindingFailure(connection, bindingFailureMessage); + _ = ProcessTask(connection, ProcessInvocationBindingFailure(connection, bindingFailureMessage)); break; case StreamBindingFailureMessage bindingFailureMessage: - _ = ProcessStreamBindingFailure(connection, bindingFailureMessage); + _ = ProcessTask(connection, ProcessStreamBindingFailure(connection, bindingFailureMessage)); break; case InvocationMessage invocationMessage: Log.ReceivedHubInvocation(_logger, invocationMessage); - _ = ProcessInvocation(connection, invocationMessage, isStreamResponse: false); + _ = ProcessTask(connection, ProcessInvocation(connection, invocationMessage, isStreamResponse: false)); break; case StreamInvocationMessage streamInvocationMessage: Log.ReceivedStreamHubInvocation(_logger, streamInvocationMessage); - _ = ProcessInvocation(connection, streamInvocationMessage, isStreamResponse: true); + _ = ProcessTask(connection, ProcessInvocation(connection, streamInvocationMessage, isStreamResponse: true)); break; case CancelInvocationMessage cancelInvocationMessage: @@ -189,7 +189,7 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe return Task.CompletedTask; case StreamItemMessage streamItem: - _ = ProcessStreamItem(connection, streamItem); + _ = ProcessTask(connection, ProcessStreamItem(connection, streamItem)); break; case CompletionMessage streamCompleteMessage: @@ -212,6 +212,18 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe } return connection.ActiveInvocationLimit.WaitAsync(); + + static async Task ProcessTask(HubConnectionContext connection, Task task) + { + try + { + await task; + } + finally + { + connection.ActiveInvocationLimit.Release(); + } + } } private Task ProcessInvocationBindingFailure(HubConnectionContext connection, InvocationBindingFailureMessage bindingFailureMessage) @@ -220,9 +232,7 @@ private Task ProcessInvocationBindingFailure(HubConnectionContext connection, In var errorMessage = ErrorMessageHelper.BuildErrorMessage($"Failed to invoke '{bindingFailureMessage.Target}' due to an error on the server.", bindingFailureMessage.BindingFailure.SourceException, _enableDetailedErrors); - var t = SendInvocationError(bindingFailureMessage.InvocationId, connection, errorMessage); - connection.ActiveInvocationLimit.Release(); - return t; + return SendInvocationError(bindingFailureMessage.InvocationId, connection, errorMessage); } private Task ProcessStreamBindingFailure(HubConnectionContext connection, StreamBindingFailureMessage bindingFailureMessage) @@ -237,8 +247,6 @@ private Task ProcessStreamBindingFailure(HubConnectionContext connection, Stream // ignore failure, it means the client already completed the stream or the stream never existed on the server connection.StreamTracker.TryComplete(message); - connection.ActiveInvocationLimit.Release(); - // TODO: Send stream completion message to client when we add it return Task.CompletedTask; } @@ -248,24 +256,11 @@ private Task ProcessStreamItem(HubConnectionContext connection, StreamItemMessag if (!connection.StreamTracker.TryProcessItem(message, out var processTask)) { Log.UnexpectedStreamItem(_logger); - connection.ActiveInvocationLimit.Release(); return Task.CompletedTask; } Log.ReceivedStreamItem(_logger, message); - return ProcessTask(connection, processTask); - - static async Task ProcessTask(HubConnectionContext connection, Task task) - { - try - { - await task; - } - finally - { - connection.ActiveInvocationLimit.Release(); - } - } + return processTask; } private Task ProcessInvocation(HubConnectionContext connection, @@ -275,12 +270,8 @@ private Task ProcessInvocation(HubConnectionContext connection, { // Send an error to the client. Then let the normal completion process occur Log.UnknownHubMethod(_logger, hubMethodInvocationMessage.Target); - var t = connection.WriteAsync(CompletionMessage.WithError( + return connection.WriteAsync(CompletionMessage.WithError( hubMethodInvocationMessage.InvocationId, $"Unknown hub method '{hubMethodInvocationMessage.Target}'")).AsTask(); - - connection.ActiveInvocationLimit.Release(); - - return t; } else { @@ -464,8 +455,6 @@ await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection, { await CleanupInvocation(connection, hubMethodInvocationMessage, hubActivator, hub, scope); } - - connection.ActiveInvocationLimit.Release(); } } diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs index d57f5c17d657..60ef1513def0 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs @@ -3155,6 +3155,62 @@ public async Task InvocationsRunInOrderWithNoParallelism() } } + [Fact] + public async Task InvocationsCanRunOutOfOrderWithParallelism() + { + using (StartVerifiableLog()) + { + var tcsService = new TcsService(); + var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder => + { + builder.AddSingleton(tcsService); + + builder.AddSignalR(options => + { + options.MaxParallelInvocationsPerClient = 2; + }); + }, LoggerFactory); + var connectionHandler = serviceProvider.GetService>(); + + // Because we use PipeScheduler.Inline the hub invocations will run inline until they wait, which happens inside the LongRunningMethod call + using (var client = new TestClient()) + { + var connectionHandlerTask = await client.ConnectAsync(connectionHandler).OrTimeout(); + + // Long running hub invocation to test that other invocations will not run until it is completed + await client.SendInvocationAsync(nameof(LongRunningHub.LongRunningMethod), nonBlocking: false).OrTimeout(); + // Wait for the long running method to start + await tcsService.StartedMethod.Task.OrTimeout(); + + for (var i = 0; i < 5; i++) + { + // Invoke another hub method which will wait for the first method to finish + await client.SendInvocationAsync(nameof(LongRunningHub.SimpleMethod), nonBlocking: false).OrTimeout(); + + // simple hub method result + var secondResult = await client.ReadAsync().OrTimeout(); + + var simpleCompletion = Assert.IsType(secondResult); + Assert.Equal(21L, simpleCompletion.Result); + } + + // Release the long running hub method + tcsService.EndMethod.TrySetResult(null); + + // Long running hub method result + var firstResult = await client.ReadAsync().OrTimeout(); + + var longRunningCompletion = Assert.IsType(firstResult); + Assert.Equal(12L, longRunningCompletion.Result); + + // Shut down + client.Dispose(); + + await connectionHandlerTask.OrTimeout(); + } + } + } + [Fact] public async Task StreamInvocationsBlockOtherInvocationsUntilTheyStartStreaming() { From cbd584465ee1167ae085af834ef64778ab208227 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Tue, 30 Jun 2020 13:21:02 -0700 Subject: [PATCH 04/21] t --- src/SignalR/server/Core/src/HubOptions.cs | 16 +++++++++++++++- .../server/SignalR/test/AddSignalRTests.cs | 9 +++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/src/SignalR/server/Core/src/HubOptions.cs b/src/SignalR/server/Core/src/HubOptions.cs index c704bdd78ed8..05db3697adc2 100644 --- a/src/SignalR/server/Core/src/HubOptions.cs +++ b/src/SignalR/server/Core/src/HubOptions.cs @@ -58,6 +58,20 @@ public class HubOptions /// By default a client is only allowed to invoke a single Hub method at a time. /// Changing this property will allow clients to invoke multiple invocations at the same time before queueing. /// - public int MaxParallelInvocationsPerClient { get; set; } = 1; + public int MaxParallelInvocationsPerClient + { + get => _maxParallelInvocationsPerClient; + set + { + if (value < 1) + { + throw new ArgumentOutOfRangeException(nameof(MaxParallelInvocationsPerClient)); + } + + _maxParallelInvocationsPerClient = value; + } + } + + private int _maxParallelInvocationsPerClient = 1; } } diff --git a/src/SignalR/server/SignalR/test/AddSignalRTests.cs b/src/SignalR/server/SignalR/test/AddSignalRTests.cs index a8cd5a93429d..fa8f97e1927d 100644 --- a/src/SignalR/server/SignalR/test/AddSignalRTests.cs +++ b/src/SignalR/server/SignalR/test/AddSignalRTests.cs @@ -105,6 +105,7 @@ public void HubSpecificOptionsHaveSameValuesAsGlobalHubOptions() Assert.Equal(globalHubOptions.HandshakeTimeout, hubOptions.HandshakeTimeout); Assert.Equal(globalHubOptions.SupportedProtocols, hubOptions.SupportedProtocols); Assert.Equal(globalHubOptions.ClientTimeoutInterval, hubOptions.ClientTimeoutInterval); + Assert.Equal(globalHubOptions.MaxParallelInvocationsPerClient, hubOptions.MaxParallelInvocationsPerClient); Assert.True(hubOptions.UserHasSetValues); } @@ -138,6 +139,7 @@ public void UserSpecifiedOptionsRunAfterDefaultOptions() options.HandshakeTimeout = null; options.SupportedProtocols = null; options.ClientTimeoutInterval = TimeSpan.FromSeconds(1); + options.MaxParallelInvocationsPerClient = 3; }); var serviceProvider = serviceCollection.BuildServiceProvider(); @@ -149,6 +151,7 @@ public void UserSpecifiedOptionsRunAfterDefaultOptions() Assert.Null(globalOptions.KeepAliveInterval); Assert.Null(globalOptions.HandshakeTimeout); Assert.Null(globalOptions.SupportedProtocols); + Assert.Equal(3, globalOptions.MaxParallelInvocationsPerClient); Assert.Equal(TimeSpan.FromSeconds(1), globalOptions.ClientTimeoutInterval); } @@ -175,6 +178,12 @@ public void HubProtocolsWithNonDefaultAttributeNotAddedToSupportedProtocols() Assert.Equal("messagepack", p); }); } + + [Fact] + public void ThrowsIfSetInvalidValueForMaxInvokes() + { + Assert.Throws(() => new HubOptions() { MaxParallelInvocationsPerClient = 0 }); + } } public class CustomHub : Hub From a088996799650a357534d1592f9c3bfe7eb9cf9f Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Tue, 30 Jun 2020 13:47:44 -0700 Subject: [PATCH 05/21] nit --- src/SignalR/server/Core/src/HubConnectionContextOptions.cs | 3 +++ src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs | 1 + 2 files changed, 4 insertions(+) diff --git a/src/SignalR/server/Core/src/HubConnectionContextOptions.cs b/src/SignalR/server/Core/src/HubConnectionContextOptions.cs index 5283ec6fe64c..54ada054cf41 100644 --- a/src/SignalR/server/Core/src/HubConnectionContextOptions.cs +++ b/src/SignalR/server/Core/src/HubConnectionContextOptions.cs @@ -33,6 +33,9 @@ public class HubConnectionContextOptions internal ISystemClock SystemClock { get; set; } = default!; + /// + /// Gets or sets the maximum parallel hub method invocations. + /// public int MaximumParallelInvocations { get; set; } = 1; } } diff --git a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs index 5a898fa747c0..3a689e8a0eeb 100644 --- a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs +++ b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs @@ -148,6 +148,7 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe // Messages are dispatched sequentially and will stop other messages from being processed until they complete. // Streaming methods will run sequentially until they start streaming, then they will fire-and-forget allowing other messages to run. + // With parallel invokes enabled, messages run sequentially until they go async and then the next message will be allowed to start running. switch (hubMessage) { From 9452c4bae105d826f66f076f493407b99635195f Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Thu, 2 Jul 2020 13:45:49 -0700 Subject: [PATCH 06/21] layer --- .../server/Core/src/HubConnectionHandler.cs | 24 +++++- .../Core/src/Internal/DefaultHubDispatcher.cs | 37 +++------ .../SignalR/test/HubConnectionHandlerTests.cs | 78 ++++++++++++++++++- 3 files changed, 107 insertions(+), 32 deletions(-) diff --git a/src/SignalR/server/Core/src/HubConnectionHandler.cs b/src/SignalR/server/Core/src/HubConnectionHandler.cs index bffe776d89fd..33839c2365fd 100644 --- a/src/SignalR/server/Core/src/HubConnectionHandler.cs +++ b/src/SignalR/server/Core/src/HubConnectionHandler.cs @@ -264,7 +264,7 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection) while (protocol.TryParseMessage(ref buffer, binder, out var message)) { messageReceived = true; - await _dispatcher.DispatchMessageAsync(connection, message); + await DispatchMessage(connection, _dispatcher, message); } if (messageReceived) @@ -291,8 +291,7 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection) if (protocol.TryParseMessage(ref segment, binder, out var message)) { messageReceived = true; - - await _dispatcher.DispatchMessageAsync(connection, message); + await DispatchMessage(connection, _dispatcher, message); } else if (overLength) { @@ -331,6 +330,25 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection) // before yielding the read again. input.AdvanceTo(buffer.Start, buffer.End); } + + static async Task DispatchMessage(HubConnectionContext connection, HubDispatcher dispatcher, HubMessage message) + { + connection.StopClientTimeout(); + _ = ProcessTask(connection, dispatcher.DispatchMessageAsync(connection, message)); + await connection.ActiveInvocationLimit.WaitAsync(); + } + + static async Task ProcessTask(HubConnectionContext connection, Task task) + { + try + { + await task; + } + finally + { + connection.ActiveInvocationLimit.Release(); + } + } } } diff --git a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs index 3a689e8a0eeb..39b4dc4e1d24 100644 --- a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs +++ b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs @@ -144,8 +144,6 @@ public override async Task OnDisconnectedAsync(HubConnectionContext connection, public override Task DispatchMessageAsync(HubConnectionContext connection, HubMessage hubMessage) { - connection.StopClientTimeout(); - // Messages are dispatched sequentially and will stop other messages from being processed until they complete. // Streaming methods will run sequentially until they start streaming, then they will fire-and-forget allowing other messages to run. // With parallel invokes enabled, messages run sequentially until they go async and then the next message will be allowed to start running. @@ -153,22 +151,18 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe switch (hubMessage) { case InvocationBindingFailureMessage bindingFailureMessage: - _ = ProcessTask(connection, ProcessInvocationBindingFailure(connection, bindingFailureMessage)); - break; + return ProcessInvocationBindingFailure(connection, bindingFailureMessage); case StreamBindingFailureMessage bindingFailureMessage: - _ = ProcessTask(connection, ProcessStreamBindingFailure(connection, bindingFailureMessage)); - break; + return ProcessStreamBindingFailure(connection, bindingFailureMessage); case InvocationMessage invocationMessage: Log.ReceivedHubInvocation(_logger, invocationMessage); - _ = ProcessTask(connection, ProcessInvocation(connection, invocationMessage, isStreamResponse: false)); - break; + return ProcessInvocation(connection, invocationMessage, isStreamResponse: false); case StreamInvocationMessage streamInvocationMessage: Log.ReceivedStreamHubInvocation(_logger, streamInvocationMessage); - _ = ProcessTask(connection, ProcessInvocation(connection, streamInvocationMessage, isStreamResponse: true)); - break; + return ProcessInvocation(connection, streamInvocationMessage, isStreamResponse: true); case CancelInvocationMessage cancelInvocationMessage: // Check if there is an associated active stream and cancel it if it exists. @@ -183,15 +177,14 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe // Stream can be canceled on the server while client is canceling stream. Log.UnexpectedCancel(_logger); } - return Task.CompletedTask; + break; case PingMessage _: connection.StartClientTimeout(); - return Task.CompletedTask; + break; case StreamItemMessage streamItem: - _ = ProcessTask(connection, ProcessStreamItem(connection, streamItem)); - break; + return ProcessStreamItem(connection, streamItem); case CompletionMessage streamCompleteMessage: // closes channels, removes from Lookup dict @@ -204,7 +197,7 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe { Log.UnexpectedStreamCompletion(_logger); } - return Task.CompletedTask; + break; // Other kind of message we weren't expecting default: @@ -212,19 +205,7 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe throw new NotSupportedException($"Received unsupported message: {hubMessage}"); } - return connection.ActiveInvocationLimit.WaitAsync(); - - static async Task ProcessTask(HubConnectionContext connection, Task task) - { - try - { - await task; - } - finally - { - connection.ActiveInvocationLimit.Release(); - } - } + return Task.CompletedTask; } private Task ProcessInvocationBindingFailure(HubConnectionContext connection, InvocationBindingFailureMessage bindingFailureMessage) diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs index 60ef1513def0..019f91decdf6 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs @@ -3079,7 +3079,13 @@ public async Task StreamingInvocationsDoNotBlockOtherInvocations() { using (StartVerifiableLog()) { - var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(null, LoggerFactory); + var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(services => + { + services.AddSignalR(options => + { + options.MaxParallelInvocationsPerClient = 1; + }); + }, LoggerFactory); var connectionHandler = serviceProvider.GetService>(); using (var client = new TestClient(new NewtonsoftJsonHubProtocol())) @@ -3211,6 +3217,76 @@ public async Task InvocationsCanRunOutOfOrderWithParallelism() } } + [Fact] + public async Task PendingInvocationUnblockedWhenBlockingMethodCompletesWithParallelism() + { + using (StartVerifiableLog()) + { + var tcsService = new TcsService(); + var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder => + { + builder.AddSingleton(tcsService); + + builder.AddSignalR(options => + { + options.MaxParallelInvocationsPerClient = 2; + }); + }, LoggerFactory); + var connectionHandler = serviceProvider.GetService>(); + + // Because we use PipeScheduler.Inline the hub invocations will run inline until they wait, which happens inside the LongRunningMethod call + using (var client = new TestClient()) + { + var connectionHandlerTask = await client.ConnectAsync(connectionHandler).OrTimeout(); + + // Long running hub invocation to test that other invocations will not run until it is completed + await client.SendInvocationAsync(nameof(LongRunningHub.LongRunningMethod), nonBlocking: false).OrTimeout(); + // Wait for the long running method to start + await tcsService.StartedMethod.Task.OrTimeout(); + // Grab the tcs before resetting to use in the second long running method + var endTcs = tcsService.EndMethod; + + tcsService.Reset(); + // Long running hub invocation to test that other invocations will not run until it is completed + await client.SendInvocationAsync(nameof(LongRunningHub.LongRunningMethod), nonBlocking: false).OrTimeout(); + // Wait for the long running method to start + await tcsService.StartedMethod.Task.OrTimeout(); + + // Invoke another hub method which will wait for the first method to finish + await client.SendInvocationAsync(nameof(LongRunningHub.SimpleMethod), nonBlocking: false).OrTimeout(); + // Both invocations should be waiting now + Assert.Null(client.TryRead()); + + // Release the second long running hub method + tcsService.EndMethod.TrySetResult(null); + + // Long running hub method result + var firstResult = await client.ReadAsync().OrTimeout(); + + var longRunningCompletion = Assert.IsType(firstResult); + Assert.Equal(12L, longRunningCompletion.Result); + + // simple hub method result + var secondResult = await client.ReadAsync().OrTimeout(); + + var simpleCompletion = Assert.IsType(secondResult); + Assert.Equal(21L, simpleCompletion.Result); + + // Release the first long running hub method + endTcs.TrySetResult(null); + + firstResult = await client.ReadAsync().OrTimeout(); + longRunningCompletion = Assert.IsType(firstResult); + Assert.Equal(12L, longRunningCompletion.Result); + + // Shut down + client.Dispose(); + + await connectionHandlerTask.OrTimeout(); + } + } + } + [Fact] public async Task StreamInvocationsBlockOtherInvocationsUntilTheyStartStreaming() { From 71ff67cc99f0d55f68d5f3cb3a606c3a6a032c07 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Mon, 6 Jul 2020 14:45:26 -0700 Subject: [PATCH 07/21] wip --- .../server/Core/src/HubConnectionHandler.cs | 4 +- .../Core/src/Internal/DefaultHubDispatcher.cs | 6 +- .../SignalR/test/HubConnectionHandlerTests.cs | 61 ++++++++++++++++++- 3 files changed, 66 insertions(+), 5 deletions(-) diff --git a/src/SignalR/server/Core/src/HubConnectionHandler.cs b/src/SignalR/server/Core/src/HubConnectionHandler.cs index 33839c2365fd..2fcd639eaee0 100644 --- a/src/SignalR/server/Core/src/HubConnectionHandler.cs +++ b/src/SignalR/server/Core/src/HubConnectionHandler.cs @@ -331,11 +331,11 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection) input.AdvanceTo(buffer.Start, buffer.End); } - static async Task DispatchMessage(HubConnectionContext connection, HubDispatcher dispatcher, HubMessage message) + static Task DispatchMessage(HubConnectionContext connection, HubDispatcher dispatcher, HubMessage message) { connection.StopClientTimeout(); _ = ProcessTask(connection, dispatcher.DispatchMessageAsync(connection, message)); - await connection.ActiveInvocationLimit.WaitAsync(); + return connection.ActiveInvocationLimit.WaitAsync(); } static async Task ProcessTask(HubConnectionContext connection, Task task) diff --git a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs index 39b4dc4e1d24..0a48d890ae63 100644 --- a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs +++ b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs @@ -351,6 +351,10 @@ await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection, if (isStreamResponse) { + // TODO: Need to sync this with CancelInvocationMessage + cts = cts ?? CancellationTokenSource.CreateLinkedTokenSource(connection.ConnectionAborted); + connection.ActiveRequestCancellationSources.TryAdd(hubMethodInvocationMessage.InvocationId, cts); + var result = await ExecuteHubMethod(methodExecutor, hub, arguments, connection, scope.ServiceProvider); if (result == null) @@ -361,8 +365,6 @@ await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection, return; } - cts = cts ?? CancellationTokenSource.CreateLinkedTokenSource(connection.ConnectionAborted); - connection.ActiveRequestCancellationSources.TryAdd(hubMethodInvocationMessage.InvocationId, cts); var enumerable = descriptor.FromReturnedStream(result, cts.Token); Log.StreamingResult(_logger, hubMethodInvocationMessage.InvocationId, methodExecutor); diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs index 019f91decdf6..c2037d325d58 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs @@ -2962,6 +2962,8 @@ public async Task HubMethodInvokeDoesNotCountTowardsClientTimeout() // Connection is closed await connectionHandlerTask.OrTimeout(); + + tcsService.EndMethod.SetResult(null); } } } @@ -3245,8 +3247,8 @@ public async Task PendingInvocationUnblockedWhenBlockingMethodCompletesWithParal await tcsService.StartedMethod.Task.OrTimeout(); // Grab the tcs before resetting to use in the second long running method var endTcs = tcsService.EndMethod; - tcsService.Reset(); + // Long running hub invocation to test that other invocations will not run until it is completed await client.SendInvocationAsync(nameof(LongRunningHub.LongRunningMethod), nonBlocking: false).OrTimeout(); // Wait for the long running method to start @@ -3347,6 +3349,63 @@ public async Task StreamInvocationsBlockOtherInvocationsUntilTheyStartStreaming( } } + [Fact] + public async Task StreamInvocationsDoNotBlockOtherInvocationsWithParallelInvokes() + { + using (StartVerifiableLog()) + { + var tcsService = new TcsService(); + var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder => + { + builder.AddSingleton(tcsService); + builder.AddSingleton(typeof(IHubActivator<>), typeof(CustomHubActivator<>)); + + builder.AddSignalR(options => + { + options.MaxParallelInvocationsPerClient = 2; + }); + }, LoggerFactory); + var connectionHandler = serviceProvider.GetService>(); + + // Because we use PipeScheduler.Inline the hub invocations will run inline until they wait, which happens inside the LongRunningMethod call + using (var client = new TestClient()) + { + var connectionHandlerTask = await client.ConnectAsync(connectionHandler).OrTimeout(); + + // Long running hub invocation to test that other invocations will not run until it is completed + var streamInvocationId = await client.SendStreamInvocationAsync(nameof(LongRunningHub.LongRunningStream), null).OrTimeout(); + // Wait for the long running method to start + await tcsService.StartedMethod.Task.OrTimeout(); + + // Invoke another hub method which will wait for the first method to finish + await client.SendInvocationAsync(nameof(LongRunningHub.SimpleMethod), nonBlocking: false).OrTimeout(); + + // simple hub method result + var result = await client.ReadAsync().OrTimeout(); + var simpleCompletion = Assert.IsType(result); + Assert.Equal(21L, simpleCompletion.Result); + + // Release the long running hub method + tcsService.EndMethod.TrySetResult(null); + + var hubActivator = serviceProvider.GetService>() as CustomHubActivator; + + await client.SendHubMessageAsync(new CancelInvocationMessage(streamInvocationId)).OrTimeout(); + + // Completion message for canceled Stream + await client.ReadAsync().OrTimeout(); + + // Shut down + client.Dispose(); + + await connectionHandlerTask.OrTimeout(); + + // OnConnectedAsync, SimpleMethod, LongRunningStream, OnDisconnectedAsync + Assert.Equal(4, hubActivator.ReleaseCount); + } + } + } + [Fact] public async Task ServerSendsCloseWithErrorWhenConnectionClosedWithPartialMessage() { From 50ec28aa3cd2d610372f43312e4c716b76dabc60 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Tue, 7 Jul 2020 11:31:08 -0700 Subject: [PATCH 08/21] stash --- .../server/Core/src/HubConnectionContext.cs | 2 + .../server/Core/src/HubConnectionHandler.cs | 61 +++++++++++++------ .../SignalR/test/HubConnectionHandlerTests.cs | 8 ++- 3 files changed, 48 insertions(+), 23 deletions(-) diff --git a/src/SignalR/server/Core/src/HubConnectionContext.cs b/src/SignalR/server/Core/src/HubConnectionContext.cs index 2f63512f6852..358b241bcb30 100644 --- a/src/SignalR/server/Core/src/HubConnectionContext.cs +++ b/src/SignalR/server/Core/src/HubConnectionContext.cs @@ -76,6 +76,7 @@ public HubConnectionContext(ConnectionContext connectionContext, HubConnectionCo var maxInvokes = contextOptions.MaximumParallelInvocations; ActiveInvocationLimit = new SemaphoreSlim(maxInvokes - 1, maxInvokes); + MaxInvokes = contextOptions.MaximumParallelInvocations; } internal StreamTracker StreamTracker @@ -96,6 +97,7 @@ internal StreamTracker StreamTracker internal Exception? CloseException { get; private set; } internal SemaphoreSlim ActiveInvocationLimit { get; } + internal int MaxInvokes { get; } /// /// Gets a that notifies when the connection is aborted. diff --git a/src/SignalR/server/Core/src/HubConnectionHandler.cs b/src/SignalR/server/Core/src/HubConnectionHandler.cs index 2fcd639eaee0..d4db57bc5742 100644 --- a/src/SignalR/server/Core/src/HubConnectionHandler.cs +++ b/src/SignalR/server/Core/src/HubConnectionHandler.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.IO; using System.Linq; +using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; using Microsoft.AspNetCore.Internal; @@ -243,6 +244,22 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection) var binder = new HubConnectionBinder(_dispatcher, connection); + var channel = Channel.CreateBounded(new BoundedChannelOptions(connection.MaxInvokes)); + + for (var i = 0; i < connection.MaxInvokes; i++) + { + _ = DispatchChannel(channel.Reader, connection, _dispatcher); + } + + static async Task DispatchChannel(ChannelReader reader, HubConnectionContext connection, HubDispatcher dispatcher) + { + while (true) + { + var message = await reader.ReadAsync(); + await dispatcher.DispatchMessageAsync(connection, message); + } + } + while (true) { var result = await input.ReadAsync(); @@ -263,8 +280,10 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection) { while (protocol.TryParseMessage(ref buffer, binder, out var message)) { + connection.StopClientTimeout(); messageReceived = true; - await DispatchMessage(connection, _dispatcher, message); + await channel.Writer.WriteAsync(message); + //await DispatchMessage(connection, _dispatcher, message); } if (messageReceived) @@ -290,8 +309,10 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection) if (protocol.TryParseMessage(ref segment, binder, out var message)) { + connection.StopClientTimeout(); messageReceived = true; - await DispatchMessage(connection, _dispatcher, message); + await channel.Writer.WriteAsync(message); + //await DispatchMessage(connection, _dispatcher, message); } else if (overLength) { @@ -331,24 +352,24 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection) input.AdvanceTo(buffer.Start, buffer.End); } - static Task DispatchMessage(HubConnectionContext connection, HubDispatcher dispatcher, HubMessage message) - { - connection.StopClientTimeout(); - _ = ProcessTask(connection, dispatcher.DispatchMessageAsync(connection, message)); - return connection.ActiveInvocationLimit.WaitAsync(); - } - - static async Task ProcessTask(HubConnectionContext connection, Task task) - { - try - { - await task; - } - finally - { - connection.ActiveInvocationLimit.Release(); - } - } + //static Task DispatchMessage(HubConnectionContext connection, HubDispatcher dispatcher, HubMessage message) + //{ + // connection.StopClientTimeout(); + // _ = ProcessTask(connection, dispatcher.DispatchMessageAsync(connection, message)); + // return connection.ActiveInvocationLimit.WaitAsync(); + //} + + //static async Task ProcessTask(HubConnectionContext connection, Task task) + //{ + // try + // { + // await task; + // } + // finally + // { + // connection.ActiveInvocationLimit.Release(); + // } + //} } } diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs index c2037d325d58..42a1403c6725 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs @@ -400,9 +400,7 @@ public async Task SendingInvocatonInChunksWorks() await client.Connection.Application.Output.WriteAsync(part3); - Assert.True(task.IsCompleted); - - var completionMessage = await task as CompletionMessage; + var completionMessage = await task.OrTimeout() as CompletionMessage; Assert.NotNull(completionMessage); Assert.Equal("hello", completionMessage.Result); Assert.Equal("1", completionMessage.InvocationId); @@ -2796,6 +2794,7 @@ public async Task ConnectionTimesOutIfInitialPingAndThenNoMessages() var connectionHandlerTask = await client.ConnectAsync(connectionHandler); await client.Connected.OrTimeout(); await client.SendHubMessageAsync(PingMessage.Instance); + await client.InvokeAsync(nameof(MethodHub.ValueMethod)); clock.UtcNow = clock.UtcNow.AddMilliseconds(timeout + 1); client.TickHeartbeat(); @@ -2944,6 +2943,9 @@ public async Task HubMethodInvokeDoesNotCountTowardsClientTimeout() var hubMethodTask = client.InvokeAsync(nameof(LongRunningHub.LongRunningMethod)); await tcsService.StartedMethod.Task.OrTimeout(); + await client.SendHubMessageAsync(PingMessage.Instance); + await client.SendHubMessageAsync(PingMessage.Instance); + // Tick heartbeat while hub method is running to show that close isn't triggered client.TickHeartbeat(); From 1cc328badd65e3e1333a2d6cdd7c05316f710a44 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Fri, 10 Jul 2020 16:31:36 -0700 Subject: [PATCH 09/21] stash --- .../server/Core/src/HubConnectionHandler.cs | 1 + .../server/Core/src/Internal/Class1.cs | 31 +++++++++++++++++++ 2 files changed, 32 insertions(+) create mode 100644 src/SignalR/server/Core/src/Internal/Class1.cs diff --git a/src/SignalR/server/Core/src/HubConnectionHandler.cs b/src/SignalR/server/Core/src/HubConnectionHandler.cs index d4db57bc5742..568c76fc5760 100644 --- a/src/SignalR/server/Core/src/HubConnectionHandler.cs +++ b/src/SignalR/server/Core/src/HubConnectionHandler.cs @@ -282,6 +282,7 @@ static async Task DispatchChannel(ChannelReader reader, HubConnectio { connection.StopClientTimeout(); messageReceived = true; + await connection.ActiveInvocationLimit.WaitToStartAsync((state) => Task.CompletedTask, (object)null); await channel.Writer.WriteAsync(message); //await DispatchMessage(connection, _dispatcher, message); } diff --git a/src/SignalR/server/Core/src/Internal/Class1.cs b/src/SignalR/server/Core/src/Internal/Class1.cs new file mode 100644 index 000000000000..58d509566b11 --- /dev/null +++ b/src/SignalR/server/Core/src/Internal/Class1.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.SignalR.Protocol; + +namespace Microsoft.AspNetCore.SignalR.Internal +{ + public static class MyCoolAPI + { + public static async Task WaitToStartAsync(this SemaphoreSlim semaphoreSlim, Func func, TState state) + { + await semaphoreSlim.WaitAsync(); + _ = RunTask(func, semaphoreSlim, state); + + static async Task RunTask(Func func, SemaphoreSlim semaphoreSlim, TState state) + { + try + { + await func(state); + } + finally + { + semaphoreSlim.Release(); + } + } + } + } +} From 0d7d6f43968083e0e74a3ef14b228b28b65e0a35 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Fri, 10 Jul 2020 17:31:15 -0700 Subject: [PATCH 10/21] beautiful --- .../server/Core/src/HubConnectionHandler.cs | 4 +-- .../server/Core/src/Internal/Class1.cs | 31 ------------------- .../Core/src/Internal/DefaultHubDispatcher.cs | 11 ++++++- .../src/Internal/SemaphoreSlimExtensions.cs | 27 ++++++++++++++++ 4 files changed, 39 insertions(+), 34 deletions(-) delete mode 100644 src/SignalR/server/Core/src/Internal/Class1.cs create mode 100644 src/SignalR/server/Core/src/Internal/SemaphoreSlimExtensions.cs diff --git a/src/SignalR/server/Core/src/HubConnectionHandler.cs b/src/SignalR/server/Core/src/HubConnectionHandler.cs index 568c76fc5760..d3a26ba4eae8 100644 --- a/src/SignalR/server/Core/src/HubConnectionHandler.cs +++ b/src/SignalR/server/Core/src/HubConnectionHandler.cs @@ -282,8 +282,8 @@ static async Task DispatchChannel(ChannelReader reader, HubConnectio { connection.StopClientTimeout(); messageReceived = true; - await connection.ActiveInvocationLimit.WaitToStartAsync((state) => Task.CompletedTask, (object)null); - await channel.Writer.WriteAsync(message); + await _dispatcher.DispatchMessageAsync(connection, message); + //await channel.Writer.WriteAsync(message); //await DispatchMessage(connection, _dispatcher, message); } diff --git a/src/SignalR/server/Core/src/Internal/Class1.cs b/src/SignalR/server/Core/src/Internal/Class1.cs deleted file mode 100644 index 58d509566b11..000000000000 --- a/src/SignalR/server/Core/src/Internal/Class1.cs +++ /dev/null @@ -1,31 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using Microsoft.AspNetCore.SignalR.Protocol; - -namespace Microsoft.AspNetCore.SignalR.Internal -{ - public static class MyCoolAPI - { - public static async Task WaitToStartAsync(this SemaphoreSlim semaphoreSlim, Func func, TState state) - { - await semaphoreSlim.WaitAsync(); - _ = RunTask(func, semaphoreSlim, state); - - static async Task RunTask(Func func, SemaphoreSlim semaphoreSlim, TState state) - { - try - { - await func(state); - } - finally - { - semaphoreSlim.Release(); - } - } - } - } -} diff --git a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs index 0a48d890ae63..131c9f04c3d5 100644 --- a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs +++ b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs @@ -158,10 +158,19 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe case InvocationMessage invocationMessage: Log.ReceivedHubInvocation(_logger, invocationMessage); - return ProcessInvocation(connection, invocationMessage, isStreamResponse: false); + return connection.ActiveInvocationLimit.RunAsync(state => + { + var (dispatcher, connection, invocationMessage) = state; + return dispatcher.ProcessInvocation(connection, invocationMessage, isStreamResponse: false); + }, (this, connection, invocationMessage)); case StreamInvocationMessage streamInvocationMessage: Log.ReceivedStreamHubInvocation(_logger, streamInvocationMessage); + //return connection.ActiveInvocationLimit.RunAsync(state => + //{ + // var (dispatcher, connection, streamInvocationMessage) = state; + // return dispatcher.ProcessInvocation(connection, streamInvocationMessage, isStreamResponse: true); + //}, (this, connection, streamInvocationMessage)); return ProcessInvocation(connection, streamInvocationMessage, isStreamResponse: true); case CancelInvocationMessage cancelInvocationMessage: diff --git a/src/SignalR/server/Core/src/Internal/SemaphoreSlimExtensions.cs b/src/SignalR/server/Core/src/Internal/SemaphoreSlimExtensions.cs new file mode 100644 index 000000000000..689038a6cc8e --- /dev/null +++ b/src/SignalR/server/Core/src/Internal/SemaphoreSlimExtensions.cs @@ -0,0 +1,27 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Microsoft.AspNetCore.SignalR.Internal +{ + internal static class SemaphoreSlimExtensions + { + public static async Task RunAsync(this SemaphoreSlim semaphoreSlim, Func callback, TState state) + { + await semaphoreSlim.WaitAsync(); + return RunTask(callback, semaphoreSlim, state); + + static async Task RunTask(Func callback, SemaphoreSlim semaphoreSlim, TState state) + { + try + { + await callback(state); + } + finally + { + semaphoreSlim.Release(); + } + } + } + } +} From 40e50c34de3919e943682384acb275847e06533d Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Mon, 13 Jul 2020 11:09:24 -0700 Subject: [PATCH 11/21] fix --- src/SignalR/server/Core/src/HubConnectionContext.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/SignalR/server/Core/src/HubConnectionContext.cs b/src/SignalR/server/Core/src/HubConnectionContext.cs index 358b241bcb30..5a7dcf0f348f 100644 --- a/src/SignalR/server/Core/src/HubConnectionContext.cs +++ b/src/SignalR/server/Core/src/HubConnectionContext.cs @@ -75,7 +75,7 @@ public HubConnectionContext(ConnectionContext connectionContext, HubConnectionCo _lastSendTimeStamp = _systemClock.UtcNowTicks; var maxInvokes = contextOptions.MaximumParallelInvocations; - ActiveInvocationLimit = new SemaphoreSlim(maxInvokes - 1, maxInvokes); + ActiveInvocationLimit = new SemaphoreSlim(maxInvokes, maxInvokes); MaxInvokes = contextOptions.MaximumParallelInvocations; } From a2b3fb693931aea2e75b1da39e0505a76de07962 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Tue, 14 Jul 2020 13:14:34 -0700 Subject: [PATCH 12/21] wip --- .../server/Core/src/HubConnectionContext.cs | 2 - .../server/Core/src/HubConnectionHandler.cs | 40 +------- .../Core/src/Internal/DefaultHubDispatcher.cs | 91 ++++++++++--------- .../HubConnectionHandlerTestUtils/Hubs.cs | 16 ++++ .../SignalR/test/HubConnectionHandlerTests.cs | 70 +++++++++++++- 5 files changed, 130 insertions(+), 89 deletions(-) diff --git a/src/SignalR/server/Core/src/HubConnectionContext.cs b/src/SignalR/server/Core/src/HubConnectionContext.cs index 5a7dcf0f348f..afcf0f02cd81 100644 --- a/src/SignalR/server/Core/src/HubConnectionContext.cs +++ b/src/SignalR/server/Core/src/HubConnectionContext.cs @@ -76,7 +76,6 @@ public HubConnectionContext(ConnectionContext connectionContext, HubConnectionCo var maxInvokes = contextOptions.MaximumParallelInvocations; ActiveInvocationLimit = new SemaphoreSlim(maxInvokes, maxInvokes); - MaxInvokes = contextOptions.MaximumParallelInvocations; } internal StreamTracker StreamTracker @@ -97,7 +96,6 @@ internal StreamTracker StreamTracker internal Exception? CloseException { get; private set; } internal SemaphoreSlim ActiveInvocationLimit { get; } - internal int MaxInvokes { get; } /// /// Gets a that notifies when the connection is aborted. diff --git a/src/SignalR/server/Core/src/HubConnectionHandler.cs b/src/SignalR/server/Core/src/HubConnectionHandler.cs index d3a26ba4eae8..fab814f80738 100644 --- a/src/SignalR/server/Core/src/HubConnectionHandler.cs +++ b/src/SignalR/server/Core/src/HubConnectionHandler.cs @@ -244,22 +244,6 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection) var binder = new HubConnectionBinder(_dispatcher, connection); - var channel = Channel.CreateBounded(new BoundedChannelOptions(connection.MaxInvokes)); - - for (var i = 0; i < connection.MaxInvokes; i++) - { - _ = DispatchChannel(channel.Reader, connection, _dispatcher); - } - - static async Task DispatchChannel(ChannelReader reader, HubConnectionContext connection, HubDispatcher dispatcher) - { - while (true) - { - var message = await reader.ReadAsync(); - await dispatcher.DispatchMessageAsync(connection, message); - } - } - while (true) { var result = await input.ReadAsync(); @@ -283,8 +267,6 @@ static async Task DispatchChannel(ChannelReader reader, HubConnectio connection.StopClientTimeout(); messageReceived = true; await _dispatcher.DispatchMessageAsync(connection, message); - //await channel.Writer.WriteAsync(message); - //await DispatchMessage(connection, _dispatcher, message); } if (messageReceived) @@ -312,8 +294,7 @@ static async Task DispatchChannel(ChannelReader reader, HubConnectio { connection.StopClientTimeout(); messageReceived = true; - await channel.Writer.WriteAsync(message); - //await DispatchMessage(connection, _dispatcher, message); + await _dispatcher.DispatchMessageAsync(connection, message); } else if (overLength) { @@ -352,25 +333,6 @@ static async Task DispatchChannel(ChannelReader reader, HubConnectio // before yielding the read again. input.AdvanceTo(buffer.Start, buffer.End); } - - //static Task DispatchMessage(HubConnectionContext connection, HubDispatcher dispatcher, HubMessage message) - //{ - // connection.StopClientTimeout(); - // _ = ProcessTask(connection, dispatcher.DispatchMessageAsync(connection, message)); - // return connection.ActiveInvocationLimit.WaitAsync(); - //} - - //static async Task ProcessTask(HubConnectionContext connection, Task task) - //{ - // try - // { - // await task; - // } - // finally - // { - // connection.ActiveInvocationLimit.Release(); - // } - //} } } diff --git a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs index 131c9f04c3d5..edbbe678693e 100644 --- a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs +++ b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs @@ -146,6 +146,7 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe { // Messages are dispatched sequentially and will stop other messages from being processed until they complete. // Streaming methods will run sequentially until they start streaming, then they will fire-and-forget allowing other messages to run. + // With parallel invokes enabled, messages run sequentially until they go async and then the next message will be allowed to start running. switch (hubMessage) @@ -166,11 +167,6 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe case StreamInvocationMessage streamInvocationMessage: Log.ReceivedStreamHubInvocation(_logger, streamInvocationMessage); - //return connection.ActiveInvocationLimit.RunAsync(state => - //{ - // var (dispatcher, connection, streamInvocationMessage) = state; - // return dispatcher.ProcessInvocation(connection, streamInvocationMessage, isStreamResponse: true); - //}, (this, connection, streamInvocationMessage)); return ProcessInvocation(connection, streamInvocationMessage, isStreamResponse: true); case CancelInvocationMessage cancelInvocationMessage: @@ -314,48 +310,11 @@ await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection, InitializeHub(hub, connection); Task invocation = null; - CancellationTokenSource cts = null; var arguments = hubMethodInvocationMessage.Arguments; + CancellationTokenSource cts = null; if (descriptor.HasSyntheticArguments) { - // In order to add the synthetic arguments we need a new array because the invocation array is too small (it doesn't know about synthetic arguments) - arguments = new object[descriptor.OriginalParameterTypes.Count]; - - var streamPointer = 0; - var hubInvocationArgumentPointer = 0; - for (var parameterPointer = 0; parameterPointer < arguments.Length; parameterPointer++) - { - if (hubMethodInvocationMessage.Arguments.Length > hubInvocationArgumentPointer && - (hubMethodInvocationMessage.Arguments[hubInvocationArgumentPointer] == null || - descriptor.OriginalParameterTypes[parameterPointer].IsAssignableFrom(hubMethodInvocationMessage.Arguments[hubInvocationArgumentPointer].GetType()))) - { - // The types match so it isn't a synthetic argument, just copy it into the arguments array - arguments[parameterPointer] = hubMethodInvocationMessage.Arguments[hubInvocationArgumentPointer]; - hubInvocationArgumentPointer++; - } - else - { - if (descriptor.OriginalParameterTypes[parameterPointer] == typeof(CancellationToken)) - { - cts = CancellationTokenSource.CreateLinkedTokenSource(connection.ConnectionAborted); - arguments[parameterPointer] = cts.Token; - } - else if (isStreamCall && ReflectionHelper.IsStreamingType(descriptor.OriginalParameterTypes[parameterPointer], mustBeDirectType: true)) - { - Log.StartingParameterStream(_logger, hubMethodInvocationMessage.StreamIds[streamPointer]); - var itemType = descriptor.StreamingParameters[streamPointer]; - arguments[parameterPointer] = connection.StreamTracker.AddStream(hubMethodInvocationMessage.StreamIds[streamPointer], - itemType, descriptor.OriginalParameterTypes[parameterPointer]); - - streamPointer++; - } - else - { - // This should never happen - Debug.Assert(false, $"Failed to bind argument of type '{descriptor.OriginalParameterTypes[parameterPointer].Name}' for hub method '{methodExecutor.MethodInfo.Name}'."); - } - } - } + ReplaceArguments(descriptor, hubMethodInvocationMessage, isStreamCall, connection, ref arguments, out cts); } if (isStreamResponse) @@ -623,6 +582,50 @@ await connection.WriteAsync(CompletionMessage.WithError(hubMethodInvocationMessa return true; } + private void ReplaceArguments(HubMethodDescriptor descriptor, HubMethodInvocationMessage hubMethodInvocationMessage, bool isStreamCall, + HubConnectionContext connection, ref object[] arguments, out CancellationTokenSource cts) + { + cts = null; + // In order to add the synthetic arguments we need a new array because the invocation array is too small (it doesn't know about synthetic arguments) + arguments = new object[descriptor.OriginalParameterTypes.Count]; + + var streamPointer = 0; + var hubInvocationArgumentPointer = 0; + for (var parameterPointer = 0; parameterPointer < arguments.Length; parameterPointer++) + { + if (hubMethodInvocationMessage.Arguments.Length > hubInvocationArgumentPointer && + (hubMethodInvocationMessage.Arguments[hubInvocationArgumentPointer] == null || + descriptor.OriginalParameterTypes[parameterPointer].IsAssignableFrom(hubMethodInvocationMessage.Arguments[hubInvocationArgumentPointer].GetType()))) + { + // The types match so it isn't a synthetic argument, just copy it into the arguments array + arguments[parameterPointer] = hubMethodInvocationMessage.Arguments[hubInvocationArgumentPointer]; + hubInvocationArgumentPointer++; + } + else + { + if (descriptor.OriginalParameterTypes[parameterPointer] == typeof(CancellationToken)) + { + cts = CancellationTokenSource.CreateLinkedTokenSource(connection.ConnectionAborted); + arguments[parameterPointer] = cts.Token; + } + else if (isStreamCall && ReflectionHelper.IsStreamingType(descriptor.OriginalParameterTypes[parameterPointer], mustBeDirectType: true)) + { + Log.StartingParameterStream(_logger, hubMethodInvocationMessage.StreamIds[streamPointer]); + var itemType = descriptor.StreamingParameters[streamPointer]; + arguments[parameterPointer] = connection.StreamTracker.AddStream(hubMethodInvocationMessage.StreamIds[streamPointer], + itemType, descriptor.OriginalParameterTypes[parameterPointer]); + + streamPointer++; + } + else + { + // This should never happen + Debug.Assert(false, $"Failed to bind argument of type '{descriptor.OriginalParameterTypes[parameterPointer].Name}' for hub method '{descriptor.MethodExecutor.MethodInfo.Name}'."); + } + } + } + } + private void DiscoverHubMethods() { var hubType = typeof(THub); diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs index c2a4893fd4fb..70d79251f289 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs @@ -239,6 +239,22 @@ public async Task> UploadArray(ChannelReader source) return results; } + [Authorize("test")] + public async Task> UploadArrayAuth(ChannelReader source) + { + var results = new List(); + + while (await source.WaitToReadAsync()) + { + while (source.TryRead(out var item)) + { + results.Add(item); + } + } + + return results; + } + public async Task TestTypeCastingErrors(ChannelReader source) { try diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs index 42a1403c6725..2f3ddd7027e9 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs @@ -2943,8 +2943,9 @@ public async Task HubMethodInvokeDoesNotCountTowardsClientTimeout() var hubMethodTask = client.InvokeAsync(nameof(LongRunningHub.LongRunningMethod)); await tcsService.StartedMethod.Task.OrTimeout(); - await client.SendHubMessageAsync(PingMessage.Instance); - await client.SendHubMessageAsync(PingMessage.Instance); + // Invoke another hub method (which will be blocked by the first method) in order to stop the timeout + // This is how a real-world example would behave + await client.SendInvocationAsync(nameof(LongRunningHub.LongRunningMethod)); // Tick heartbeat while hub method is running to show that close isn't triggered client.TickHeartbeat(); @@ -2964,8 +2965,6 @@ public async Task HubMethodInvokeDoesNotCountTowardsClientTimeout() // Connection is closed await connectionHandlerTask.OrTimeout(); - - tcsService.EndMethod.SetResult(null); } } } @@ -3561,6 +3560,69 @@ public async Task UploadManyStreams() } } + private class DelayRequirement : AuthorizationHandler, IAuthorizationRequirement + { + private readonly TcsService _tcsService; + public DelayRequirement(TcsService tcsService) + { + _tcsService = tcsService; + } + + protected override async Task HandleRequirementAsync(AuthorizationHandlerContext context, DelayRequirement requirement, HubInvocationContext resource) + { + _tcsService.StartedMethod.SetResult(null); + await _tcsService.EndMethod.Task; + context.Succeed(requirement); + } + } + + [Fact] + public async Task UploadStreamParallel() + { + // Use Auth as the delay injection point because it is one of the first things to run after the invocation message has been parsed + var tcsService = new TcsService(); + var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(services => + { + services.AddSignalR(options => + { + options.MaxParallelInvocationsPerClient = 1; + }); + + services.AddAuthorization(options => + { + options.AddPolicy("test", policy => + { + policy.Requirements.Add(new DelayRequirement(tcsService)); + }); + }); + }); + var connectionHandler = serviceProvider.GetService>(); + + using (var client = new TestClient()) + { + var connectionHandlerTask = await client.ConnectAsync(connectionHandler).OrTimeout(); + await client.BeginUploadStreamAsync("invocation", nameof(MethodHub.UploadArrayAuth), new[] { "id" }, Array.Empty()); + await tcsService.StartedMethod.Task.OrTimeout(); + + var objects = new[] { new SampleObject("solo", 322), new SampleObject("ggez", 3145) }; + foreach (var thing in objects) + { + await client.SendHubMessageAsync(new StreamItemMessage("id", thing)).OrTimeout(); + } + + tcsService.EndMethod.SetResult(null); + + await client.SendHubMessageAsync(CompletionMessage.Empty("id")).OrTimeout(); + var response = (CompletionMessage)await client.ReadAsync().OrTimeout(); + var result = ((JArray)response.Result).ToArray(); + + Assert.Equal(objects[0].Foo, ((JContainer)result[0])["foo"]); + Assert.Equal(objects[0].Bar, ((JContainer)result[0])["bar"]); + Assert.Equal(objects[1].Foo, ((JContainer)result[1])["foo"]); + Assert.Equal(objects[1].Bar, ((JContainer)result[1])["bar"]); + } + } + [Fact] public async Task ConnectionAbortedIfSendFailsWithProtocolError() { From 4ced6abc7ec99545771288abdad2739ff8ab0068 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Wed, 15 Jul 2020 10:20:16 -0700 Subject: [PATCH 13/21] possible --- .../Core/src/Internal/DefaultHubDispatcher.cs | 52 +++++++++++-------- .../SignalR/test/HubConnectionHandlerTests.cs | 8 +-- 2 files changed, 31 insertions(+), 29 deletions(-) diff --git a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs index edbbe678693e..57ea741a1ca9 100644 --- a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs +++ b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs @@ -167,7 +167,11 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe case StreamInvocationMessage streamInvocationMessage: Log.ReceivedStreamHubInvocation(_logger, streamInvocationMessage); - return ProcessInvocation(connection, streamInvocationMessage, isStreamResponse: true); + return connection.ActiveInvocationLimit.RunAsync(state => + { + var (dispatcher, connection, invocationMessage) = state; + return dispatcher.ProcessInvocation(connection, invocationMessage, isStreamResponse: true); + }, (this, connection, streamInvocationMessage)); case CancelInvocationMessage cancelInvocationMessage: // Check if there is an associated active stream and cancel it if it exists. @@ -278,6 +282,30 @@ private async Task Invoke(HubMethodDescriptor descriptor, HubConnectionContext c THub hub = null; try { + var clientStreamLength = hubMethodInvocationMessage.StreamIds?.Length ?? 0; + var serverStreamLength = descriptor.StreamingParameters?.Count ?? 0; + if (clientStreamLength != serverStreamLength) + { + var ex = new HubException($"Client sent {clientStreamLength} stream(s), Hub method expects {serverStreamLength}."); + Log.InvalidHubParameters(_logger, hubMethodInvocationMessage.Target, ex); + await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection, + ErrorMessageHelper.BuildErrorMessage($"An unexpected error occurred invoking '{hubMethodInvocationMessage.Target}' on the server.", ex, _enableDetailedErrors)); + return; + } + + var arguments = hubMethodInvocationMessage.Arguments; + CancellationTokenSource cts = null; + if (descriptor.HasSyntheticArguments) + { + ReplaceArguments(descriptor, hubMethodInvocationMessage, isStreamCall, connection, ref arguments, out cts); + } + + if (isStreamResponse) + { + cts = cts ?? CancellationTokenSource.CreateLinkedTokenSource(connection.ConnectionAborted); + connection.ActiveRequestCancellationSources.TryAdd(hubMethodInvocationMessage.InvocationId, cts); + } + hubActivator = scope.ServiceProvider.GetRequiredService>(); hub = hubActivator.Create(); @@ -296,33 +324,11 @@ await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection, try { - var clientStreamLength = hubMethodInvocationMessage.StreamIds?.Length ?? 0; - var serverStreamLength = descriptor.StreamingParameters?.Count ?? 0; - if (clientStreamLength != serverStreamLength) - { - var ex = new HubException($"Client sent {clientStreamLength} stream(s), Hub method expects {serverStreamLength}."); - Log.InvalidHubParameters(_logger, hubMethodInvocationMessage.Target, ex); - await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection, - ErrorMessageHelper.BuildErrorMessage($"An unexpected error occurred invoking '{hubMethodInvocationMessage.Target}' on the server.", ex, _enableDetailedErrors)); - return; - } - InitializeHub(hub, connection); Task invocation = null; - var arguments = hubMethodInvocationMessage.Arguments; - CancellationTokenSource cts = null; - if (descriptor.HasSyntheticArguments) - { - ReplaceArguments(descriptor, hubMethodInvocationMessage, isStreamCall, connection, ref arguments, out cts); - } - if (isStreamResponse) { - // TODO: Need to sync this with CancelInvocationMessage - cts = cts ?? CancellationTokenSource.CreateLinkedTokenSource(connection.ConnectionAborted); - connection.ActiveRequestCancellationSources.TryAdd(hubMethodInvocationMessage.InvocationId, cts); - var result = await ExecuteHubMethod(methodExecutor, hub, arguments, connection, scope.ServiceProvider); if (result == null) diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs index 2f3ddd7027e9..1afd3a496837 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs @@ -3577,17 +3577,13 @@ protected override async Task HandleRequirementAsync(AuthorizationHandlerContext } [Fact] - public async Task UploadStreamParallel() + // Test to check if StreamItems can be processed before the Stream from the invocation is properly registered internally + public async Task UploadStreamStreamItemsSentAsSoonAsPossible() { // Use Auth as the delay injection point because it is one of the first things to run after the invocation message has been parsed var tcsService = new TcsService(); var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(services => { - services.AddSignalR(options => - { - options.MaxParallelInvocationsPerClient = 1; - }); - services.AddAuthorization(options => { options.AddPolicy("test", policy => From 63edb9b977bd802ed24264f43f1655f81fa242c7 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Thu, 16 Jul 2020 10:28:42 -0700 Subject: [PATCH 14/21] things --- .../Core/src/Internal/DefaultHubDispatcher.cs | 96 +++++++-------- .../HubConnectionHandlerTestUtils/Hubs.cs | 7 ++ .../SignalR/test/HubConnectionHandlerTests.cs | 110 +++++++----------- 3 files changed, 96 insertions(+), 117 deletions(-) diff --git a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs index 57ea741a1ca9..ec39b31eef96 100644 --- a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs +++ b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs @@ -159,19 +159,11 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe case InvocationMessage invocationMessage: Log.ReceivedHubInvocation(_logger, invocationMessage); - return connection.ActiveInvocationLimit.RunAsync(state => - { - var (dispatcher, connection, invocationMessage) = state; - return dispatcher.ProcessInvocation(connection, invocationMessage, isStreamResponse: false); - }, (this, connection, invocationMessage)); + return ProcessInvocation(connection, invocationMessage, isStreamResponse: false); case StreamInvocationMessage streamInvocationMessage: Log.ReceivedStreamHubInvocation(_logger, streamInvocationMessage); - return connection.ActiveInvocationLimit.RunAsync(state => - { - var (dispatcher, connection, invocationMessage) = state; - return dispatcher.ProcessInvocation(connection, invocationMessage, isStreamResponse: true); - }, (this, connection, streamInvocationMessage)); + return ProcessInvocation(connection, streamInvocationMessage, isStreamResponse: true); case CancelInvocationMessage cancelInvocationMessage: // Check if there is an associated active stream and cancel it if it exists. @@ -267,7 +259,18 @@ private Task ProcessInvocation(HubConnectionContext connection, else { bool isStreamCall = descriptor.StreamingParameters != null; - return Invoke(descriptor, connection, hubMethodInvocationMessage, isStreamResponse, isStreamCall); + if (!isStreamCall && !isStreamResponse) + { + return connection.ActiveInvocationLimit.RunAsync(state => + { + var (dispatcher, descriptor, connection, invocationMessage) = state; + return dispatcher.Invoke(descriptor, connection, invocationMessage, isStreamResponse: false, isStreamCall: false); + }, (this, descriptor, connection, hubMethodInvocationMessage)); + } + else + { + return Invoke(descriptor, connection, hubMethodInvocationMessage, isStreamResponse, isStreamCall); + } } } @@ -282,30 +285,6 @@ private async Task Invoke(HubMethodDescriptor descriptor, HubConnectionContext c THub hub = null; try { - var clientStreamLength = hubMethodInvocationMessage.StreamIds?.Length ?? 0; - var serverStreamLength = descriptor.StreamingParameters?.Count ?? 0; - if (clientStreamLength != serverStreamLength) - { - var ex = new HubException($"Client sent {clientStreamLength} stream(s), Hub method expects {serverStreamLength}."); - Log.InvalidHubParameters(_logger, hubMethodInvocationMessage.Target, ex); - await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection, - ErrorMessageHelper.BuildErrorMessage($"An unexpected error occurred invoking '{hubMethodInvocationMessage.Target}' on the server.", ex, _enableDetailedErrors)); - return; - } - - var arguments = hubMethodInvocationMessage.Arguments; - CancellationTokenSource cts = null; - if (descriptor.HasSyntheticArguments) - { - ReplaceArguments(descriptor, hubMethodInvocationMessage, isStreamCall, connection, ref arguments, out cts); - } - - if (isStreamResponse) - { - cts = cts ?? CancellationTokenSource.CreateLinkedTokenSource(connection.ConnectionAborted); - connection.ActiveRequestCancellationSources.TryAdd(hubMethodInvocationMessage.InvocationId, cts); - } - hubActivator = scope.ServiceProvider.GetRequiredService>(); hub = hubActivator.Create(); @@ -324,25 +303,50 @@ await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection, try { + var clientStreamLength = hubMethodInvocationMessage.StreamIds?.Length ?? 0; + var serverStreamLength = descriptor.StreamingParameters?.Count ?? 0; + if (clientStreamLength != serverStreamLength) + { + var ex = new HubException($"Client sent {clientStreamLength} stream(s), Hub method expects {serverStreamLength}."); + Log.InvalidHubParameters(_logger, hubMethodInvocationMessage.Target, ex); + await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection, + ErrorMessageHelper.BuildErrorMessage($"An unexpected error occurred invoking '{hubMethodInvocationMessage.Target}' on the server.", ex, _enableDetailedErrors)); + return; + } + InitializeHub(hub, connection); Task invocation = null; - if (isStreamResponse) + var arguments = hubMethodInvocationMessage.Arguments; + CancellationTokenSource cts = null; + if (descriptor.HasSyntheticArguments) { - var result = await ExecuteHubMethod(methodExecutor, hub, arguments, connection, scope.ServiceProvider); + ReplaceArguments(descriptor, hubMethodInvocationMessage, isStreamCall, connection, ref arguments, out cts); + } - if (result == null) + if (isStreamResponse) + { + _ = ExecuteStreamInvocation(); + async Task ExecuteStreamInvocation() { - Log.InvalidReturnValueFromStreamingMethod(_logger, methodExecutor.MethodInfo.Name); - await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection, - $"The value returned by the streaming method '{methodExecutor.MethodInfo.Name}' is not a ChannelReader<> or IAsyncEnumerable<>."); - return; - } + cts = cts ?? CancellationTokenSource.CreateLinkedTokenSource(connection.ConnectionAborted); + connection.ActiveRequestCancellationSources.TryAdd(hubMethodInvocationMessage.InvocationId, cts); - var enumerable = descriptor.FromReturnedStream(result, cts.Token); + var result = await ExecuteHubMethod(methodExecutor, hub, arguments, connection, scope.ServiceProvider); - Log.StreamingResult(_logger, hubMethodInvocationMessage.InvocationId, methodExecutor); - _ = StreamResultsAsync(hubMethodInvocationMessage.InvocationId, connection, enumerable, scope, hubActivator, hub, cts, hubMethodInvocationMessage); + if (result == null) + { + Log.InvalidReturnValueFromStreamingMethod(_logger, methodExecutor.MethodInfo.Name); + await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection, + $"The value returned by the streaming method '{methodExecutor.MethodInfo.Name}' is not a ChannelReader<> or IAsyncEnumerable<>."); + return; + } + + var enumerable = descriptor.FromReturnedStream(result, cts.Token); + + Log.StreamingResult(_logger, hubMethodInvocationMessage.InvocationId, methodExecutor); + await StreamResultsAsync(hubMethodInvocationMessage.InvocationId, connection, enumerable, scope, hubActivator, hub, cts, hubMethodInvocationMessage); + } } else { diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs index 70d79251f289..1fa0ec94c740 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs @@ -1026,6 +1026,13 @@ public int SimpleMethod() return 21; } + public async Task Upload(ChannelReader stream) + { + _tcsService.StartedMethod.SetResult(null); + _ = await stream.ReadAndCollectAllAsync(); + _tcsService.EndMethod.SetResult(null); + } + private class CustomAsyncEnumerable : IAsyncEnumerable { private readonly TcsService _tcsService; diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs index 1afd3a496837..6bc112fb02d6 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs @@ -3291,7 +3291,7 @@ public async Task PendingInvocationUnblockedWhenBlockingMethodCompletesWithParal } [Fact] - public async Task StreamInvocationsBlockOtherInvocationsUntilTheyStartStreaming() + public async Task StreamInvocationsDoNotBlockOtherInvocations() { using (StartVerifiableLog()) { @@ -3308,83 +3308,20 @@ public async Task StreamInvocationsBlockOtherInvocationsUntilTheyStartStreaming( }, LoggerFactory); var connectionHandler = serviceProvider.GetService>(); - // Because we use PipeScheduler.Inline the hub invocations will run inline until they wait, which happens inside the LongRunningMethod call - using (var client = new TestClient()) - { - var connectionHandlerTask = await client.ConnectAsync(connectionHandler).OrTimeout(); - - // Long running hub invocation to test that other invocations will not run until it is completed - var streamInvocationId = await client.SendStreamInvocationAsync(nameof(LongRunningHub.LongRunningStream), null).OrTimeout(); - // Wait for the long running method to start - await tcsService.StartedMethod.Task.OrTimeout(); - - // Invoke another hub method which will wait for the first method to finish - await client.SendInvocationAsync(nameof(LongRunningHub.SimpleMethod), nonBlocking: false).OrTimeout(); - // Both invocations should be waiting now - Assert.Null(client.TryRead()); - - // Release the long running hub method - tcsService.EndMethod.TrySetResult(null); - - // simple hub method result - var result = await client.ReadAsync().OrTimeout(); - - var simpleCompletion = Assert.IsType(result); - Assert.Equal(21L, simpleCompletion.Result); - - var hubActivator = serviceProvider.GetService>() as CustomHubActivator; - - await client.SendHubMessageAsync(new CancelInvocationMessage(streamInvocationId)).OrTimeout(); - - // Completion message for canceled Stream - await client.ReadAsync().OrTimeout(); - - // Shut down - client.Dispose(); - - await connectionHandlerTask.OrTimeout(); - - // OnConnectedAsync, SimpleMethod, LongRunningStream, OnDisconnectedAsync - Assert.Equal(4, hubActivator.ReleaseCount); - } - } - } - - [Fact] - public async Task StreamInvocationsDoNotBlockOtherInvocationsWithParallelInvokes() - { - using (StartVerifiableLog()) - { - var tcsService = new TcsService(); - var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder => - { - builder.AddSingleton(tcsService); - builder.AddSingleton(typeof(IHubActivator<>), typeof(CustomHubActivator<>)); - - builder.AddSignalR(options => - { - options.MaxParallelInvocationsPerClient = 2; - }); - }, LoggerFactory); - var connectionHandler = serviceProvider.GetService>(); - - // Because we use PipeScheduler.Inline the hub invocations will run inline until they wait, which happens inside the LongRunningMethod call + // Because we use PipeScheduler.Inline the hub invocations will run inline until they go async using (var client = new TestClient()) { var connectionHandlerTask = await client.ConnectAsync(connectionHandler).OrTimeout(); - // Long running hub invocation to test that other invocations will not run until it is completed + // Long running stream invocation to test that other invocations can still run before it is completed var streamInvocationId = await client.SendStreamInvocationAsync(nameof(LongRunningHub.LongRunningStream), null).OrTimeout(); // Wait for the long running method to start await tcsService.StartedMethod.Task.OrTimeout(); - // Invoke another hub method which will wait for the first method to finish - await client.SendInvocationAsync(nameof(LongRunningHub.SimpleMethod), nonBlocking: false).OrTimeout(); - - // simple hub method result - var result = await client.ReadAsync().OrTimeout(); - var simpleCompletion = Assert.IsType(result); - Assert.Equal(21L, simpleCompletion.Result); + // Invoke another hub method which will be able to run even though a streaming method is still running + var completion = await client.InvokeAsync(nameof(LongRunningHub.SimpleMethod)).OrTimeout(); + Assert.Null(completion.Error); + Assert.Equal(21L, completion.Result); // Release the long running hub method tcsService.EndMethod.TrySetResult(null); @@ -3394,7 +3331,8 @@ public async Task StreamInvocationsDoNotBlockOtherInvocationsWithParallelInvokes await client.SendHubMessageAsync(new CancelInvocationMessage(streamInvocationId)).OrTimeout(); // Completion message for canceled Stream - await client.ReadAsync().OrTimeout(); + completion = Assert.IsType(await client.ReadAsync().OrTimeout()); + Assert.Equal(streamInvocationId, completion.InvocationId); // Shut down client.Dispose(); @@ -3619,6 +3557,36 @@ public async Task UploadStreamStreamItemsSentAsSoonAsPossible() } } + [Fact] + public async Task UploadStreamDoesNotCountTowardsMaxInvocationLimit() + { + var tcsService = new TcsService(); + var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(services => + { + services.AddSignalR(options => options.MaxParallelInvocationsPerClient = 1); + services.AddSingleton(tcsService); + }); + var connectionHandler = serviceProvider.GetService>(); + + using (var client = new TestClient()) + { + var connectionHandlerTask = await client.ConnectAsync(connectionHandler).OrTimeout(); + await client.BeginUploadStreamAsync("invocation", nameof(LongRunningHub.Upload), new[] { "id" }, Array.Empty()); + await tcsService.StartedMethod.Task.OrTimeout(); + + var completion = await client.InvokeAsync(nameof(LongRunningHub.SimpleMethod)).OrTimeout(); + Assert.Null(completion.Error); + Assert.Equal(21L, completion.Result); + + await client.SendHubMessageAsync(CompletionMessage.Empty("id")).OrTimeout(); + + await tcsService.EndMethod.Task.OrTimeout(); + var response = (CompletionMessage)await client.ReadAsync().OrTimeout(); + Assert.Null(response.Result); + Assert.Null(response.Error); + } + } + [Fact] public async Task ConnectionAbortedIfSendFailsWithProtocolError() { From 79280df65542030dc234989f7a65682d3f40c594 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Thu, 16 Jul 2020 12:23:10 -0700 Subject: [PATCH 15/21] fix --- src/SignalR/server/Core/src/HubOptions.cs | 2 +- .../Core/src/Internal/DefaultHubDispatcher.cs | 13 ++++++++++++- .../Core/src/Internal/SemaphoreSlimExtensions.cs | 3 +++ src/SignalR/server/SignalR/test/AddSignalRTests.cs | 13 +++++++++++++ .../SignalR/test/HubConnectionHandlerTests.cs | 3 +-- 5 files changed, 30 insertions(+), 4 deletions(-) diff --git a/src/SignalR/server/Core/src/HubOptions.cs b/src/SignalR/server/Core/src/HubOptions.cs index 05db3697adc2..527a6076e129 100644 --- a/src/SignalR/server/Core/src/HubOptions.cs +++ b/src/SignalR/server/Core/src/HubOptions.cs @@ -56,7 +56,7 @@ public class HubOptions /// /// By default a client is only allowed to invoke a single Hub method at a time. - /// Changing this property will allow clients to invoke multiple invocations at the same time before queueing. + /// Changing this property will allow clients to invoke multiple methods at the same time before queueing. /// public int MaxParallelInvocationsPerClient { diff --git a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs index ec39b31eef96..fcd00d528cb4 100644 --- a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs +++ b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs @@ -331,8 +331,19 @@ async Task ExecuteStreamInvocation() { cts = cts ?? CancellationTokenSource.CreateLinkedTokenSource(connection.ConnectionAborted); connection.ActiveRequestCancellationSources.TryAdd(hubMethodInvocationMessage.InvocationId, cts); + object result; - var result = await ExecuteHubMethod(methodExecutor, hub, arguments, connection, scope.ServiceProvider); + try + { + result = await ExecuteHubMethod(methodExecutor, hub, arguments, connection, scope.ServiceProvider); + } + catch (Exception ex) + { + Log.FailedInvokingHubMethod(_logger, hubMethodInvocationMessage.Target, ex); + await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection, + ErrorMessageHelper.BuildErrorMessage($"An unexpected error occurred invoking '{hubMethodInvocationMessage.Target}' on the server.", ex, _enableDetailedErrors)); + return; + } if (result == null) { diff --git a/src/SignalR/server/Core/src/Internal/SemaphoreSlimExtensions.cs b/src/SignalR/server/Core/src/Internal/SemaphoreSlimExtensions.cs index 689038a6cc8e..8f0d9e713847 100644 --- a/src/SignalR/server/Core/src/Internal/SemaphoreSlimExtensions.cs +++ b/src/SignalR/server/Core/src/Internal/SemaphoreSlimExtensions.cs @@ -1,3 +1,6 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information. + using System; using System.Threading; using System.Threading.Tasks; diff --git a/src/SignalR/server/SignalR/test/AddSignalRTests.cs b/src/SignalR/server/SignalR/test/AddSignalRTests.cs index fa8f97e1927d..437293be4bf4 100644 --- a/src/SignalR/server/SignalR/test/AddSignalRTests.cs +++ b/src/SignalR/server/SignalR/test/AddSignalRTests.cs @@ -7,6 +7,7 @@ using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Connections; +using Microsoft.AspNetCore.SignalR; using Microsoft.AspNetCore.SignalR.Internal; using Microsoft.AspNetCore.SignalR.Protocol; using Microsoft.Extensions.DependencyInjection; @@ -78,11 +79,15 @@ public void HubSpecificOptionsDoNotAffectGlobalHubOptions() serviceCollection.AddSignalR().AddHubOptions(options => { options.SupportedProtocols.Clear(); + options.AddFilter(new CustomHubFilter()); }); var serviceProvider = serviceCollection.BuildServiceProvider(); Assert.Equal(1, serviceProvider.GetRequiredService>().Value.SupportedProtocols.Count); Assert.Equal(0, serviceProvider.GetRequiredService>>().Value.SupportedProtocols.Count); + + Assert.Null(serviceProvider.GetRequiredService>().Value.HubFilters); + Assert.Single(serviceProvider.GetRequiredService>>().Value.HubFilters); } [Fact] @@ -342,6 +347,14 @@ public void WriteMessage(HubMessage message, IBufferWriter output) throw new NotImplementedException(); } } + + internal class CustomHubFilter : IHubFilter + { + public ValueTask InvokeMethodAsync(HubInvocationContext invocationContext, Func> next) + { + throw new NotImplementedException(); + } + } } namespace Microsoft.AspNetCore.SignalR.Internal diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs index 6bc112fb02d6..166f50ff04d8 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs @@ -2794,7 +2794,6 @@ public async Task ConnectionTimesOutIfInitialPingAndThenNoMessages() var connectionHandlerTask = await client.ConnectAsync(connectionHandler); await client.Connected.OrTimeout(); await client.SendHubMessageAsync(PingMessage.Instance); - await client.InvokeAsync(nameof(MethodHub.ValueMethod)); clock.UtcNow = clock.UtcNow.AddMilliseconds(timeout + 1); client.TickHeartbeat(); @@ -2945,7 +2944,7 @@ public async Task HubMethodInvokeDoesNotCountTowardsClientTimeout() // Invoke another hub method (which will be blocked by the first method) in order to stop the timeout // This is how a real-world example would behave - await client.SendInvocationAsync(nameof(LongRunningHub.LongRunningMethod)); + await client.SendInvocationAsync(nameof(LongRunningHub.LongRunningMethod)).OrTimeout(); // Tick heartbeat while hub method is running to show that close isn't triggered client.TickHeartbeat(); From 71da1b55a00bdab2bc8280b809c767242c6ab648 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Thu, 16 Jul 2020 14:57:56 -0700 Subject: [PATCH 16/21] fast path --- src/SignalR/server/Core/src/HubConnectionContext.cs | 8 ++++++-- .../server/Core/src/Internal/DefaultHubDispatcher.cs | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/SignalR/server/Core/src/HubConnectionContext.cs b/src/SignalR/server/Core/src/HubConnectionContext.cs index afcf0f02cd81..62459f5db524 100644 --- a/src/SignalR/server/Core/src/HubConnectionContext.cs +++ b/src/SignalR/server/Core/src/HubConnectionContext.cs @@ -74,8 +74,12 @@ public HubConnectionContext(ConnectionContext connectionContext, HubConnectionCo _systemClock = contextOptions.SystemClock ?? new SystemClock(); _lastSendTimeStamp = _systemClock.UtcNowTicks; - var maxInvokes = contextOptions.MaximumParallelInvocations; - ActiveInvocationLimit = new SemaphoreSlim(maxInvokes, maxInvokes); + // We'll be avoiding using the semaphore when the limit is set to 1, so no need to allocate it + var maxInvokeLimit = contextOptions.MaximumParallelInvocations; + if (maxInvokeLimit != 1) + { + ActiveInvocationLimit = new SemaphoreSlim(maxInvokeLimit, maxInvokeLimit); + } } internal StreamTracker StreamTracker diff --git a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs index fcd00d528cb4..28f1afbe6790 100644 --- a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs +++ b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs @@ -259,7 +259,7 @@ private Task ProcessInvocation(HubConnectionContext connection, else { bool isStreamCall = descriptor.StreamingParameters != null; - if (!isStreamCall && !isStreamResponse) + if (connection.ActiveInvocationLimit != null && !isStreamCall && !isStreamResponse) { return connection.ActiveInvocationLimit.RunAsync(state => { From 0f2098a81dbc0ad67c4099e41c9bd5a6a7b57064 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Fri, 17 Jul 2020 11:52:13 -0700 Subject: [PATCH 17/21] stream things --- .../DefaultHubDispatcherBenchmark.cs | 1 + .../src/Internal/DefaultHubDispatcher.Log.cs | 8 ++ .../Core/src/Internal/DefaultHubDispatcher.cs | 79 ++++++------ .../HubConnectionHandlerTestUtils/Hubs.cs | 12 +- .../SignalR/test/HubConnectionHandlerTests.cs | 114 +++++++++++++++++- 5 files changed, 170 insertions(+), 44 deletions(-) diff --git a/src/SignalR/perf/Microbenchmarks/DefaultHubDispatcherBenchmark.cs b/src/SignalR/perf/Microbenchmarks/DefaultHubDispatcherBenchmark.cs index 6850989e3da7..62ad679754d1 100644 --- a/src/SignalR/perf/Microbenchmarks/DefaultHubDispatcherBenchmark.cs +++ b/src/SignalR/perf/Microbenchmarks/DefaultHubDispatcherBenchmark.cs @@ -47,6 +47,7 @@ public void GlobalSetup() var contextOptions = new HubConnectionContextOptions() { KeepAliveInterval = TimeSpan.Zero, + StreamBufferCapacity = 10, }; _connectionContext = new NoErrorHubConnectionContext(connection, contextOptions, NullLoggerFactory.Instance); diff --git a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.Log.cs b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.Log.cs index 359c78f2db97..e6710f7e31e5 100644 --- a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.Log.cs +++ b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.Log.cs @@ -79,6 +79,9 @@ private static class Log private static readonly Action _invalidHubParameters = LoggerMessage.Define(LogLevel.Debug, new EventId(22, "InvalidHubParameters"), "Parameters to hub method '{HubMethod}' are incorrect."); + private static readonly Action _invocationIdInUse = + LoggerMessage.Define(LogLevel.Debug, new EventId(23, "InvocationIdInUse"), "Invocation ID '{InvocationId}' is already in use."); + public static void ReceivedHubInvocation(ILogger logger, InvocationMessage invocationMessage) { _receivedHubInvocation(logger, invocationMessage, null); @@ -188,6 +191,11 @@ public static void InvalidHubParameters(ILogger logger, string hubMethod, Except { _invalidHubParameters(logger, hubMethod, exception); } + + public static void InvocationIdInUse(ILogger logger, string InvocationId) + { + _invocationIdInUse(logger, InvocationId, null); + } } } } diff --git a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs index 28f1afbe6790..ee6be9f57fd8 100644 --- a/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs +++ b/src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs @@ -326,38 +326,7 @@ await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection, if (isStreamResponse) { - _ = ExecuteStreamInvocation(); - async Task ExecuteStreamInvocation() - { - cts = cts ?? CancellationTokenSource.CreateLinkedTokenSource(connection.ConnectionAborted); - connection.ActiveRequestCancellationSources.TryAdd(hubMethodInvocationMessage.InvocationId, cts); - object result; - - try - { - result = await ExecuteHubMethod(methodExecutor, hub, arguments, connection, scope.ServiceProvider); - } - catch (Exception ex) - { - Log.FailedInvokingHubMethod(_logger, hubMethodInvocationMessage.Target, ex); - await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection, - ErrorMessageHelper.BuildErrorMessage($"An unexpected error occurred invoking '{hubMethodInvocationMessage.Target}' on the server.", ex, _enableDetailedErrors)); - return; - } - - if (result == null) - { - Log.InvalidReturnValueFromStreamingMethod(_logger, methodExecutor.MethodInfo.Name); - await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection, - $"The value returned by the streaming method '{methodExecutor.MethodInfo.Name}' is not a ChannelReader<> or IAsyncEnumerable<>."); - return; - } - - var enumerable = descriptor.FromReturnedStream(result, cts.Token); - - Log.StreamingResult(_logger, hubMethodInvocationMessage.InvocationId, methodExecutor); - await StreamResultsAsync(hubMethodInvocationMessage.InvocationId, connection, enumerable, scope, hubActivator, hub, cts, hubMethodInvocationMessage); - } + _ = StreamAsync(hubMethodInvocationMessage.InvocationId, connection, arguments, scope, hubActivator, hub, cts, hubMethodInvocationMessage, descriptor); } else { @@ -447,13 +416,45 @@ private ValueTask CleanupInvocation(HubConnectionContext connection, HubMethodIn return scope.DisposeAsync(); } - private async Task StreamResultsAsync(string invocationId, HubConnectionContext connection, IAsyncEnumerable enumerable, IServiceScope scope, - IHubActivator hubActivator, THub hub, CancellationTokenSource streamCts, HubMethodInvocationMessage hubMethodInvocationMessage) + private async Task StreamAsync(string invocationId, HubConnectionContext connection, object[] arguments, IServiceScope scope, + IHubActivator hubActivator, THub hub, CancellationTokenSource streamCts, HubMethodInvocationMessage hubMethodInvocationMessage, HubMethodDescriptor descriptor) { string error = null; + streamCts = streamCts ?? CancellationTokenSource.CreateLinkedTokenSource(connection.ConnectionAborted); + try { + if (!connection.ActiveRequestCancellationSources.TryAdd(invocationId, streamCts)) + { + Log.InvocationIdInUse(_logger, invocationId); + error = $"Invocation ID '{invocationId}' is already in use."; + return; + } + + object result; + try + { + result = await ExecuteHubMethod(descriptor.MethodExecutor, hub, arguments, connection, scope.ServiceProvider); + } + catch (Exception ex) + { + Log.FailedInvokingHubMethod(_logger, hubMethodInvocationMessage.Target, ex); + error = ErrorMessageHelper.BuildErrorMessage($"An unexpected error occurred invoking '{hubMethodInvocationMessage.Target}' on the server.", ex, _enableDetailedErrors); + return; + } + + if (result == null) + { + Log.InvalidReturnValueFromStreamingMethod(_logger, descriptor.MethodExecutor.MethodInfo.Name); + error = $"The value returned by the streaming method '{descriptor.MethodExecutor.MethodInfo.Name}' is not a ChannelReader<> or IAsyncEnumerable<>."; + return; + } + + var enumerable = descriptor.FromReturnedStream(result, streamCts.Token); + + Log.StreamingResult(_logger, hubMethodInvocationMessage.InvocationId, descriptor.MethodExecutor); + await foreach (var streamItem in enumerable) { // Send the stream item @@ -468,8 +469,7 @@ private async Task StreamResultsAsync(string invocationId, HubConnectionContext catch (Exception ex) { // If the streaming method was canceled we don't want to send a HubException message - this is not an error case - if (!(ex is OperationCanceledException && connection.ActiveRequestCancellationSources.TryGetValue(invocationId, out var cts) - && cts.IsCancellationRequested)) + if (!(ex is OperationCanceledException && streamCts.IsCancellationRequested)) { error = ErrorMessageHelper.BuildErrorMessage("An error occurred on the server while streaming results.", ex, _enableDetailedErrors); } @@ -478,15 +478,10 @@ private async Task StreamResultsAsync(string invocationId, HubConnectionContext { await CleanupInvocation(connection, hubMethodInvocationMessage, hubActivator, hub, scope); - // Dispose the linked CTS for the stream. streamCts.Dispose(); + connection.ActiveRequestCancellationSources.TryRemove(invocationId, out _); await connection.WriteAsync(CompletionMessage.WithError(invocationId, error)); - - if (connection.ActiveRequestCancellationSources.TryRemove(invocationId, out var cts)) - { - cts.Dispose(); - } } } diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs index 1fa0ec94c740..498ce608e4aa 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTestUtils/Hubs.cs @@ -700,13 +700,23 @@ public ChannelReader BlockingStream() return Channel.CreateUnbounded().Reader; } - public ChannelReader ThrowStream() + public ChannelReader ExceptionStream() { var channel = Channel.CreateUnbounded(); channel.Writer.TryComplete(new Exception("Exception from channel")); return channel.Reader; } + public ChannelReader ThrowStream() + { + throw new Exception("Throw from hub method"); + } + + public ChannelReader NullStream() + { + return null; + } + public int NonStream() { return 42; diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs index 166f50ff04d8..0f97aa252302 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs @@ -2087,7 +2087,7 @@ public async Task ReceiveCorrectErrorFromStreamThrowing(bool detailedErrors) await client.Connected.OrTimeout(); - var messages = await client.StreamAsync(nameof(StreamingHub.ThrowStream)); + var messages = await client.StreamAsync(nameof(StreamingHub.ExceptionStream)); Assert.Equal(1, messages.Count); var completion = messages[0] as CompletionMessage; @@ -3108,6 +3108,118 @@ public async Task StreamingInvocationsDoNotBlockOtherInvocations() } } + [Fact] + public async Task StreamMethodThatThrowsWillCleanup() + { + bool ExpectedErrors(WriteContext writeContext) + { + return writeContext.LoggerName == "Microsoft.AspNetCore.SignalR.Internal.DefaultHubDispatcher" && + writeContext.EventId.Name == "FailedInvokingHubMethod"; + } + + using (StartVerifiableLog(ExpectedErrors)) + { + var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder => + { + builder.AddSingleton(typeof(IHubActivator<>), typeof(CustomHubActivator<>)); + }, LoggerFactory); + var connectionHandler = serviceProvider.GetService>(); + + using (var client = new TestClient()) + { + var connectionHandlerTask = await client.ConnectAsync(connectionHandler); + + await client.Connected.OrTimeout(); + + var messages = await client.StreamAsync(nameof(StreamingHub.ThrowStream)); + + Assert.Equal(1, messages.Count); + var completion = messages[0] as CompletionMessage; + Assert.NotNull(completion); + + var hubActivator = serviceProvider.GetService>() as CustomHubActivator; + + // OnConnectedAsync and ThrowStream hubs have been disposed + Assert.Equal(2, hubActivator.ReleaseCount); + + client.Dispose(); + + await connectionHandlerTask.OrTimeout(); + } + } + } + + [Fact] + public async Task StreamMethodThatReturnsNullWillCleanup() + { + using (StartVerifiableLog()) + { + var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder => + { + builder.AddSingleton(typeof(IHubActivator<>), typeof(CustomHubActivator<>)); + }, LoggerFactory); + var connectionHandler = serviceProvider.GetService>(); + + using (var client = new TestClient()) + { + var connectionHandlerTask = await client.ConnectAsync(connectionHandler); + + await client.Connected.OrTimeout(); + + var messages = await client.StreamAsync(nameof(StreamingHub.NullStream)); + + Assert.Equal(1, messages.Count); + var completion = messages[0] as CompletionMessage; + Assert.NotNull(completion); + + var hubActivator = serviceProvider.GetService>() as CustomHubActivator; + + // OnConnectedAsync and NullStream hubs have been disposed + Assert.Equal(2, hubActivator.ReleaseCount); + + client.Dispose(); + + await connectionHandlerTask.OrTimeout(); + } + } + } + + [Fact] + public async Task StreamMethodWithDuplicateIdFails() + { + using (StartVerifiableLog()) + { + var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder => + { + builder.AddSingleton(typeof(IHubActivator<>), typeof(CustomHubActivator<>)); + }, LoggerFactory); + var connectionHandler = serviceProvider.GetService>(); + + using (var client = new TestClient()) + { + var connectionHandlerTask = await client.ConnectAsync(connectionHandler); + + await client.Connected.OrTimeout(); + + await client.SendHubMessageAsync(new StreamInvocationMessage("123", nameof(StreamingHub.BlockingStream), Array.Empty())).OrTimeout(); + + await client.SendHubMessageAsync(new StreamInvocationMessage("123", nameof(StreamingHub.BlockingStream), Array.Empty())).OrTimeout(); + + var completion = Assert.IsType(await client.ReadAsync().OrTimeout()); + Assert.Equal("Invocation ID '123' is already in use.", completion.Error); + + var hubActivator = serviceProvider.GetService>() as CustomHubActivator; + + // OnConnectedAsync and BlockingStream hubs have been disposed + Assert.Equal(2, hubActivator.ReleaseCount); + + client.Dispose(); + + await connectionHandlerTask.OrTimeout(); + } + } + } + [Fact] public async Task InvocationsRunInOrderWithNoParallelism() { From e78d587432aed8f9eb93f4f97c4196a96d218f84 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Fri, 31 Jul 2020 15:50:08 -0700 Subject: [PATCH 18/21] change test --- src/SignalR/server/Core/src/HubConnectionContext.cs | 3 ++- src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs | 4 ---- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/SignalR/server/Core/src/HubConnectionContext.cs b/src/SignalR/server/Core/src/HubConnectionContext.cs index 62459f5db524..9f94b6a8b0d4 100644 --- a/src/SignalR/server/Core/src/HubConnectionContext.cs +++ b/src/SignalR/server/Core/src/HubConnectionContext.cs @@ -99,7 +99,8 @@ internal StreamTracker StreamTracker internal HubCallerContext HubCallerContext { get; } internal Exception? CloseException { get; private set; } - internal SemaphoreSlim ActiveInvocationLimit { get; } + + internal SemaphoreSlim? ActiveInvocationLimit { get; } /// /// Gets a that notifies when the connection is aborted. diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs index 0f97aa252302..c7b8410469ef 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs @@ -2942,10 +2942,6 @@ public async Task HubMethodInvokeDoesNotCountTowardsClientTimeout() var hubMethodTask = client.InvokeAsync(nameof(LongRunningHub.LongRunningMethod)); await tcsService.StartedMethod.Task.OrTimeout(); - // Invoke another hub method (which will be blocked by the first method) in order to stop the timeout - // This is how a real-world example would behave - await client.SendInvocationAsync(nameof(LongRunningHub.LongRunningMethod)).OrTimeout(); - // Tick heartbeat while hub method is running to show that close isn't triggered client.TickHeartbeat(); From f198f5c40553e7d2dba33e021767c489e60fd299 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Mon, 17 Aug 2020 12:15:39 -0700 Subject: [PATCH 19/21] fb --- .../server/Core/src/HubConnectionHandler.cs | 4 +-- src/SignalR/server/Core/src/HubOptions.cs | 10 +++--- .../server/Core/src/HubOptionsSetup`T.cs | 2 +- .../src/Internal/SemaphoreSlimExtensions.cs | 31 +++++++++++++------ .../server/SignalR/test/AddSignalRTests.cs | 8 ++--- .../SignalR/test/HubConnectionHandlerTests.cs | 16 +++++----- 6 files changed, 41 insertions(+), 30 deletions(-) diff --git a/src/SignalR/server/Core/src/HubConnectionHandler.cs b/src/SignalR/server/Core/src/HubConnectionHandler.cs index fab814f80738..e10203ca3940 100644 --- a/src/SignalR/server/Core/src/HubConnectionHandler.cs +++ b/src/SignalR/server/Core/src/HubConnectionHandler.cs @@ -72,7 +72,7 @@ IServiceScopeFactory serviceScopeFactory { _maximumMessageSize = _hubOptions.MaximumReceiveMessageSize; _enableDetailedErrors = _hubOptions.EnableDetailedErrors ?? _enableDetailedErrors; - _maxParallelInvokes = _hubOptions.MaxParallelInvocationsPerClient; + _maxParallelInvokes = _hubOptions.MaximumParallelInvocationsPerClient; if (_hubOptions.HubFilters != null) { @@ -84,7 +84,7 @@ IServiceScopeFactory serviceScopeFactory { _maximumMessageSize = _globalHubOptions.MaximumReceiveMessageSize; _enableDetailedErrors = _globalHubOptions.EnableDetailedErrors ?? _enableDetailedErrors; - _maxParallelInvokes = _globalHubOptions.MaxParallelInvocationsPerClient; + _maxParallelInvokes = _globalHubOptions.MaximumParallelInvocationsPerClient; if (_globalHubOptions.HubFilters != null) { diff --git a/src/SignalR/server/Core/src/HubOptions.cs b/src/SignalR/server/Core/src/HubOptions.cs index 527a6076e129..c3dd5033c784 100644 --- a/src/SignalR/server/Core/src/HubOptions.cs +++ b/src/SignalR/server/Core/src/HubOptions.cs @@ -58,20 +58,20 @@ public class HubOptions /// By default a client is only allowed to invoke a single Hub method at a time. /// Changing this property will allow clients to invoke multiple methods at the same time before queueing. /// - public int MaxParallelInvocationsPerClient + public int MaximumParallelInvocationsPerClient { - get => _maxParallelInvocationsPerClient; + get => _maximumParallelInvocationsPerClient; set { if (value < 1) { - throw new ArgumentOutOfRangeException(nameof(MaxParallelInvocationsPerClient)); + throw new ArgumentOutOfRangeException(nameof(MaximumParallelInvocationsPerClient)); } - _maxParallelInvocationsPerClient = value; + _maximumParallelInvocationsPerClient = value; } } - private int _maxParallelInvocationsPerClient = 1; + private int _maximumParallelInvocationsPerClient = 1; } } diff --git a/src/SignalR/server/Core/src/HubOptionsSetup`T.cs b/src/SignalR/server/Core/src/HubOptionsSetup`T.cs index dfcdca74d676..1dfae3de0c9f 100644 --- a/src/SignalR/server/Core/src/HubOptionsSetup`T.cs +++ b/src/SignalR/server/Core/src/HubOptionsSetup`T.cs @@ -25,7 +25,7 @@ public void Configure(HubOptions options) options.EnableDetailedErrors = _hubOptions.EnableDetailedErrors; options.MaximumReceiveMessageSize = _hubOptions.MaximumReceiveMessageSize; options.StreamBufferCapacity = _hubOptions.StreamBufferCapacity; - options.MaxParallelInvocationsPerClient = _hubOptions.MaxParallelInvocationsPerClient; + options.MaximumParallelInvocationsPerClient = _hubOptions.MaximumParallelInvocationsPerClient; options.UserHasSetValues = true; diff --git a/src/SignalR/server/Core/src/Internal/SemaphoreSlimExtensions.cs b/src/SignalR/server/Core/src/Internal/SemaphoreSlimExtensions.cs index 8f0d9e713847..4650fb11e894 100644 --- a/src/SignalR/server/Core/src/Internal/SemaphoreSlimExtensions.cs +++ b/src/SignalR/server/Core/src/Internal/SemaphoreSlimExtensions.cs @@ -9,21 +9,32 @@ namespace Microsoft.AspNetCore.SignalR.Internal { internal static class SemaphoreSlimExtensions { - public static async Task RunAsync(this SemaphoreSlim semaphoreSlim, Func callback, TState state) + public static Task RunAsync(this SemaphoreSlim semaphoreSlim, Func callback, TState state) + { + if (semaphoreSlim.Wait(0)) + { + _ = RunTask(callback, semaphoreSlim, state); + return Task.CompletedTask; + } + + return RunSlowAsync(semaphoreSlim, callback, state); + } + + private static async Task RunSlowAsync(this SemaphoreSlim semaphoreSlim, Func callback, TState state) { await semaphoreSlim.WaitAsync(); return RunTask(callback, semaphoreSlim, state); + } - static async Task RunTask(Func callback, SemaphoreSlim semaphoreSlim, TState state) + static async Task RunTask(Func callback, SemaphoreSlim semaphoreSlim, TState state) + { + try + { + await callback(state); + } + finally { - try - { - await callback(state); - } - finally - { - semaphoreSlim.Release(); - } + semaphoreSlim.Release(); } } } diff --git a/src/SignalR/server/SignalR/test/AddSignalRTests.cs b/src/SignalR/server/SignalR/test/AddSignalRTests.cs index 437293be4bf4..1eca6c63eddc 100644 --- a/src/SignalR/server/SignalR/test/AddSignalRTests.cs +++ b/src/SignalR/server/SignalR/test/AddSignalRTests.cs @@ -110,7 +110,7 @@ public void HubSpecificOptionsHaveSameValuesAsGlobalHubOptions() Assert.Equal(globalHubOptions.HandshakeTimeout, hubOptions.HandshakeTimeout); Assert.Equal(globalHubOptions.SupportedProtocols, hubOptions.SupportedProtocols); Assert.Equal(globalHubOptions.ClientTimeoutInterval, hubOptions.ClientTimeoutInterval); - Assert.Equal(globalHubOptions.MaxParallelInvocationsPerClient, hubOptions.MaxParallelInvocationsPerClient); + Assert.Equal(globalHubOptions.MaximumParallelInvocationsPerClient, hubOptions.MaximumParallelInvocationsPerClient); Assert.True(hubOptions.UserHasSetValues); } @@ -144,7 +144,7 @@ public void UserSpecifiedOptionsRunAfterDefaultOptions() options.HandshakeTimeout = null; options.SupportedProtocols = null; options.ClientTimeoutInterval = TimeSpan.FromSeconds(1); - options.MaxParallelInvocationsPerClient = 3; + options.MaximumParallelInvocationsPerClient = 3; }); var serviceProvider = serviceCollection.BuildServiceProvider(); @@ -156,7 +156,7 @@ public void UserSpecifiedOptionsRunAfterDefaultOptions() Assert.Null(globalOptions.KeepAliveInterval); Assert.Null(globalOptions.HandshakeTimeout); Assert.Null(globalOptions.SupportedProtocols); - Assert.Equal(3, globalOptions.MaxParallelInvocationsPerClient); + Assert.Equal(3, globalOptions.MaximumParallelInvocationsPerClient); Assert.Equal(TimeSpan.FromSeconds(1), globalOptions.ClientTimeoutInterval); } @@ -187,7 +187,7 @@ public void HubProtocolsWithNonDefaultAttributeNotAddedToSupportedProtocols() [Fact] public void ThrowsIfSetInvalidValueForMaxInvokes() { - Assert.Throws(() => new HubOptions() { MaxParallelInvocationsPerClient = 0 }); + Assert.Throws(() => new HubOptions() { MaximumParallelInvocationsPerClient = 0 }); } } diff --git a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs index c7b8410469ef..862cde22d499 100644 --- a/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs +++ b/src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs @@ -2923,7 +2923,7 @@ public async Task HubMethodInvokeDoesNotCountTowardsClientTimeout() services.Configure(options => { options.ClientTimeoutInterval = TimeSpan.FromMilliseconds(0); - options.MaxParallelInvocationsPerClient = 1; + options.MaximumParallelInvocationsPerClient = 1; }); services.AddSingleton(tcsService); }, LoggerFactory); @@ -2975,7 +2975,7 @@ public async Task HubMethodInvokeCountsTowardsClientTimeoutIfParallelNotMaxed() services.Configure(options => { options.ClientTimeoutInterval = TimeSpan.FromMilliseconds(0); - options.MaxParallelInvocationsPerClient = 2; + options.MaximumParallelInvocationsPerClient = 2; }); services.AddSingleton(tcsService); }, LoggerFactory); @@ -3081,7 +3081,7 @@ public async Task StreamingInvocationsDoNotBlockOtherInvocations() { services.AddSignalR(options => { - options.MaxParallelInvocationsPerClient = 1; + options.MaximumParallelInvocationsPerClient = 1; }); }, LoggerFactory); var connectionHandler = serviceProvider.GetService>(); @@ -3228,7 +3228,7 @@ public async Task InvocationsRunInOrderWithNoParallelism() builder.AddSignalR(options => { - options.MaxParallelInvocationsPerClient = 1; + options.MaximumParallelInvocationsPerClient = 1; }); }, LoggerFactory); var connectionHandler = serviceProvider.GetService>(); @@ -3283,7 +3283,7 @@ public async Task InvocationsCanRunOutOfOrderWithParallelism() builder.AddSignalR(options => { - options.MaxParallelInvocationsPerClient = 2; + options.MaximumParallelInvocationsPerClient = 2; }); }, LoggerFactory); var connectionHandler = serviceProvider.GetService>(); @@ -3339,7 +3339,7 @@ public async Task PendingInvocationUnblockedWhenBlockingMethodCompletesWithParal builder.AddSignalR(options => { - options.MaxParallelInvocationsPerClient = 2; + options.MaximumParallelInvocationsPerClient = 2; }); }, LoggerFactory); var connectionHandler = serviceProvider.GetService>(); @@ -3410,7 +3410,7 @@ public async Task StreamInvocationsDoNotBlockOtherInvocations() builder.AddSignalR(options => { - options.MaxParallelInvocationsPerClient = 1; + options.MaximumParallelInvocationsPerClient = 1; }); }, LoggerFactory); var connectionHandler = serviceProvider.GetService>(); @@ -3670,7 +3670,7 @@ public async Task UploadStreamDoesNotCountTowardsMaxInvocationLimit() var tcsService = new TcsService(); var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(services => { - services.AddSignalR(options => options.MaxParallelInvocationsPerClient = 1); + services.AddSignalR(options => options.MaximumParallelInvocationsPerClient = 1); services.AddSingleton(tcsService); }); var connectionHandler = serviceProvider.GetService>(); From a0d5b74ec86e1c8bb7573d03418f8f3ba9b848f8 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Mon, 17 Aug 2020 13:08:25 -0700 Subject: [PATCH 20/21] move field --- src/SignalR/server/Core/src/HubOptions.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/SignalR/server/Core/src/HubOptions.cs b/src/SignalR/server/Core/src/HubOptions.cs index c3dd5033c784..684b8c343b29 100644 --- a/src/SignalR/server/Core/src/HubOptions.cs +++ b/src/SignalR/server/Core/src/HubOptions.cs @@ -11,6 +11,8 @@ namespace Microsoft.AspNetCore.SignalR /// public class HubOptions { + private int _maximumParallelInvocationsPerClient = 1; + // HandshakeTimeout and KeepAliveInterval are set to null here to help identify when // local hub options have been set. Global default values are set in HubOptionsSetup. // SupportedProtocols being null is the true default value, and it represents support @@ -71,7 +73,5 @@ public int MaximumParallelInvocationsPerClient _maximumParallelInvocationsPerClient = value; } } - - private int _maximumParallelInvocationsPerClient = 1; } } From d4fedad32b089db44f4f5999ba6edecb35fe2961 Mon Sep 17 00:00:00 2001 From: BrennanConroy Date: Mon, 17 Aug 2020 14:43:52 -0700 Subject: [PATCH 21/21] comment --- src/SignalR/server/Core/src/HubConnectionHandler.cs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/SignalR/server/Core/src/HubConnectionHandler.cs b/src/SignalR/server/Core/src/HubConnectionHandler.cs index e10203ca3940..ab6fbd8c01c8 100644 --- a/src/SignalR/server/Core/src/HubConnectionHandler.cs +++ b/src/SignalR/server/Core/src/HubConnectionHandler.cs @@ -265,6 +265,7 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection) while (protocol.TryParseMessage(ref buffer, binder, out var message)) { connection.StopClientTimeout(); + // This lets us know the timeout has stopped and we need to re-enable it after dispatching the message messageReceived = true; await _dispatcher.DispatchMessageAsync(connection, message); } @@ -293,6 +294,7 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection) if (protocol.TryParseMessage(ref segment, binder, out var message)) { connection.StopClientTimeout(); + // This lets us know the timeout has stopped and we need to re-enable it after dispatching the message messageReceived = true; await _dispatcher.DispatchMessageAsync(connection, message); }