Skip to content

Commit 1957655

Browse files
Run hub invocations serially (#2086)
1 parent 4e8c1e1 commit 1957655

File tree

4 files changed

+240
-42
lines changed

4 files changed

+240
-42
lines changed

src/Microsoft.AspNetCore.SignalR.Core/HubConnectionHandler.cs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,6 @@ private async Task SendCloseAsync(HubConnectionContext connection, Exception exc
164164

165165
private async Task DispatchMessagesAsync(HubConnectionContext connection)
166166
{
167-
// Since we dispatch multiple hub invocations in parallel, we need a way to communicate failure back to the main processing loop.
168-
// This is done by aborting the connection.
169-
170167
try
171168
{
172169
var input = connection.Input;
@@ -182,9 +179,9 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection)
182179
{
183180
while (protocol.TryParseMessage(ref buffer, _dispatcher, out var message))
184181
{
185-
// Don't wait on the result of execution, continue processing other
186-
// incoming messages on this connection.
187-
_ = _dispatcher.DispatchMessageAsync(connection, message);
182+
// Messages are dispatched sequentially and will block other messages from being processed until they complete.
183+
// Streaming methods will run sequentially until they start streaming, then they will fire-and-forget allowing other messages to run.
184+
await _dispatcher.DispatchMessageAsync(connection, message);
188185
}
189186
}
190187
else if (result.IsCompleted)

src/Microsoft.AspNetCore.SignalR.Core/Internal/DefaultHubDispatcher.cs

Lines changed: 48 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,11 @@ private async Task Invoke(HubMethodDescriptor descriptor, HubConnectionContext c
170170
{
171171
var methodExecutor = descriptor.MethodExecutor;
172172

173-
using (var scope = _serviceScopeFactory.CreateScope())
173+
var disposeScope = true;
174+
var scope = _serviceScopeFactory.CreateScope();
175+
IHubActivator<THub> hubActivator = null;
176+
THub hub = null;
177+
try
174178
{
175179
if (!await IsHubMethodAuthorized(scope.ServiceProvider, connection.User, descriptor.Policies))
176180
{
@@ -185,8 +189,8 @@ await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection,
185189
return;
186190
}
187191

188-
var hubActivator = scope.ServiceProvider.GetRequiredService<IHubActivator<THub>>();
189-
var hub = hubActivator.Create();
192+
hubActivator = scope.ServiceProvider.GetRequiredService<IHubActivator<THub>>();
193+
hub = hubActivator.Create();
190194

191195
try
192196
{
@@ -205,8 +209,10 @@ await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection,
205209
return;
206210
}
207211

212+
disposeScope = false;
208213
Log.StreamingResult(_logger, hubMethodInvocationMessage.InvocationId, methodExecutor);
209-
await StreamResultsAsync(hubMethodInvocationMessage.InvocationId, connection, enumerator, streamCts);
214+
// Fire-and-forget stream invocations, otherwise they would block other hub invocations from being able to run
215+
_ = StreamResultsAsync(hubMethodInvocationMessage.InvocationId, connection, enumerator, scope, hubActivator, hub, streamCts);
210216
}
211217
// Non-empty/null InvocationId ==> Blocking invocation that needs a response
212218
else if (!string.IsNullOrEmpty(hubMethodInvocationMessage.InvocationId))
@@ -227,51 +233,60 @@ await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection,
227233
await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection,
228234
ErrorMessageHelper.BuildErrorMessage($"An unexpected error occurred invoking '{hubMethodInvocationMessage.Target}' on the server.", ex, _enableDetailedErrors));
229235
}
230-
finally
236+
}
237+
finally
238+
{
239+
if (disposeScope)
231240
{
232-
hubActivator.Release(hub);
241+
hubActivator?.Release(hub);
242+
scope.Dispose();
233243
}
234244
}
235245
}
236246

237-
private async Task StreamResultsAsync(string invocationId, HubConnectionContext connection, IAsyncEnumerator<object> enumerator, CancellationTokenSource streamCts)
247+
private async Task StreamResultsAsync(string invocationId, HubConnectionContext connection, IAsyncEnumerator<object> enumerator, IServiceScope scope, IHubActivator<THub> hubActivator, THub hub, CancellationTokenSource streamCts)
238248
{
239249
string error = null;
240250

241-
try
251+
using (scope)
242252
{
243-
while (await enumerator.MoveNextAsync())
253+
try
244254
{
245-
// Send the stream item
246-
await connection.WriteAsync(new StreamItemMessage(invocationId, enumerator.Current));
255+
while (await enumerator.MoveNextAsync())
256+
{
257+
// Send the stream item
258+
await connection.WriteAsync(new StreamItemMessage(invocationId, enumerator.Current));
259+
}
247260
}
248-
}
249-
catch (ChannelClosedException ex)
250-
{
251-
// If the channel closes from an exception in the streaming method, grab the innerException for the error from the streaming method
252-
error = ErrorMessageHelper.BuildErrorMessage("An error occurred on the server while streaming results.", ex.InnerException ?? ex, _enableDetailedErrors);
253-
}
254-
catch (Exception ex)
255-
{
256-
// If the streaming method was canceled we don't want to send a HubException message - this is not an error case
257-
if (!(ex is OperationCanceledException && connection.ActiveRequestCancellationSources.TryGetValue(invocationId, out var cts)
258-
&& cts.IsCancellationRequested))
261+
catch (ChannelClosedException ex)
259262
{
260-
error = ErrorMessageHelper.BuildErrorMessage("An error occurred on the server while streaming results.", ex, _enableDetailedErrors);
263+
// If the channel closes from an exception in the streaming method, grab the innerException for the error from the streaming method
264+
error = ErrorMessageHelper.BuildErrorMessage("An error occurred on the server while streaming results.", ex.InnerException ?? ex, _enableDetailedErrors);
261265
}
262-
}
263-
finally
264-
{
265-
(enumerator as IDisposable)?.Dispose();
266+
catch (Exception ex)
267+
{
268+
// If the streaming method was canceled we don't want to send a HubException message - this is not an error case
269+
if (!(ex is OperationCanceledException && connection.ActiveRequestCancellationSources.TryGetValue(invocationId, out var cts)
270+
&& cts.IsCancellationRequested))
271+
{
272+
error = ErrorMessageHelper.BuildErrorMessage("An error occurred on the server while streaming results.", ex, _enableDetailedErrors);
273+
}
274+
}
275+
finally
276+
{
277+
(enumerator as IDisposable)?.Dispose();
278+
279+
hubActivator.Release(hub);
266280

267-
// Dispose the linked CTS for the stream.
268-
streamCts.Dispose();
281+
// Dispose the linked CTS for the stream.
282+
streamCts.Dispose();
269283

270-
await connection.WriteAsync(CompletionMessage.WithError(invocationId, error));
284+
await connection.WriteAsync(CompletionMessage.WithError(invocationId, error));
271285

272-
if (connection.ActiveRequestCancellationSources.TryRemove(invocationId, out var cts))
273-
{
274-
cts.Dispose();
286+
if (connection.ActiveRequestCancellationSources.TryRemove(invocationId, out var cts))
287+
{
288+
cts.Dispose();
289+
}
275290
}
276291
}
277292
}

test/Microsoft.AspNetCore.SignalR.Tests/HubConnectionHandlerTestUtils/Hubs.cs

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33

44
using System;
55
using System.Collections.Generic;
6-
using System.Reactive.Linq;
7-
using System.Threading;
86
using System.Threading.Channels;
97
using System.Threading.Tasks;
108
using Microsoft.AspNetCore.Authorization;
@@ -431,7 +429,6 @@ public void Kill()
431429

432430
public class StreamingHub : TestHub
433431
{
434-
435432
public ChannelReader<string> CounterChannel(int count)
436433
{
437434
var channel = Channel.CreateUnbounded<string>();
@@ -471,6 +468,11 @@ public ChannelReader<int> ThrowStream()
471468
channel.Writer.TryComplete(new Exception("Exception from channel"));
472469
return channel.Reader;
473470
}
471+
472+
public int NonStream()
473+
{
474+
return 42;
475+
}
474476
}
475477

476478
public class SimpleHub : Hub
@@ -491,6 +493,42 @@ public override async Task OnConnectedAsync()
491493
}
492494
}
493495

496+
public class LongRunningHub : Hub
497+
{
498+
private TcsService _tcsService;
499+
500+
public LongRunningHub(TcsService tcsService)
501+
{
502+
_tcsService = tcsService;
503+
}
504+
505+
public async Task<int> LongRunningMethod()
506+
{
507+
_tcsService.StartedMethod.TrySetResult(null);
508+
await _tcsService.EndMethod.Task;
509+
return 12;
510+
}
511+
512+
public async Task<ChannelReader<string>> LongRunningStream()
513+
{
514+
_tcsService.StartedMethod.TrySetResult(null);
515+
await _tcsService.EndMethod.Task;
516+
// Never ending stream
517+
return Channel.CreateUnbounded<string>().Reader;
518+
}
519+
520+
public int SimpleMethod()
521+
{
522+
return 21;
523+
}
524+
}
525+
526+
public class TcsService
527+
{
528+
public TaskCompletionSource<object> StartedMethod = new TaskCompletionSource<object>();
529+
public TaskCompletionSource<object> EndMethod = new TaskCompletionSource<object>();
530+
}
531+
494532
public interface ITypedHubClient
495533
{
496534
Task Send(string message);

test/Microsoft.AspNetCore.SignalR.Tests/HubConnectionHandlerTests.cs

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
using Microsoft.AspNetCore.Connections;
1414
using Microsoft.AspNetCore.Http;
1515
using Microsoft.AspNetCore.Http.Connections.Features;
16+
using Microsoft.AspNetCore.SignalR.Internal;
1617
using Microsoft.AspNetCore.SignalR.Protocol;
1718
using Microsoft.Extensions.DependencyInjection;
1819
using Microsoft.Extensions.Options;
@@ -1982,6 +1983,153 @@ public async Task ErrorInHubOnConnectSendsCloseMessageWithError(bool detailedErr
19821983
}
19831984
}
19841985

1986+
[Fact]
1987+
public async Task StreamingInvocationsDoNotBlockOtherInvocations()
1988+
{
1989+
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider();
1990+
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<StreamingHub>>();
1991+
1992+
using (var client = new TestClient(new JsonHubProtocol()))
1993+
{
1994+
var connectionHandlerTask = await client.ConnectAsync(connectionHandler).OrTimeout();
1995+
1996+
// Blocking streaming invocation to test that other invocations can still run
1997+
await client.SendHubMessageAsync(new StreamInvocationMessage("1", nameof(StreamingHub.BlockingStream), Array.Empty<object>())).OrTimeout();
1998+
1999+
var completion = await client.InvokeAsync(nameof(StreamingHub.NonStream)).OrTimeout();
2000+
Assert.Equal(42L, completion.Result);
2001+
2002+
// Shut down
2003+
client.Dispose();
2004+
2005+
await connectionHandlerTask.OrTimeout();
2006+
}
2007+
}
2008+
2009+
[Fact]
2010+
public async Task InvocationsRunInOrder()
2011+
{
2012+
var tcsService = new TcsService();
2013+
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder =>
2014+
{
2015+
builder.AddSingleton(tcsService);
2016+
});
2017+
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<LongRunningHub>>();
2018+
2019+
// Because we use PipeScheduler.Inline the hub invocations will run inline until they wait, which happens inside the LongRunningMethod call
2020+
using (var client = new TestClient())
2021+
{
2022+
var connectionHandlerTask = await client.ConnectAsync(connectionHandler).OrTimeout();
2023+
2024+
// Long running hub invocation to test that other invocations will not run until it is completed
2025+
await client.SendInvocationAsync(nameof(LongRunningHub.LongRunningMethod), nonBlocking: false).OrTimeout();
2026+
// Wait for the long running method to start
2027+
await tcsService.StartedMethod.Task.OrTimeout();
2028+
2029+
// Invoke another hub method which will wait for the first method to finish
2030+
await client.SendInvocationAsync(nameof(LongRunningHub.SimpleMethod), nonBlocking: false).OrTimeout();
2031+
// Both invocations should be waiting now
2032+
Assert.Null(client.TryRead());
2033+
2034+
// Release the long running hub method
2035+
tcsService.EndMethod.TrySetResult(null);
2036+
2037+
// Long running hub method result
2038+
var firstResult = await client.ReadAsync().OrTimeout();
2039+
2040+
var longRunningCompletion = Assert.IsType<CompletionMessage>(firstResult);
2041+
Assert.Equal(12L, longRunningCompletion.Result);
2042+
2043+
// simple hub method result
2044+
var secondResult = await client.ReadAsync().OrTimeout();
2045+
2046+
var simpleCompletion = Assert.IsType<CompletionMessage>(secondResult);
2047+
Assert.Equal(21L, simpleCompletion.Result);
2048+
2049+
// Shut down
2050+
client.Dispose();
2051+
2052+
await connectionHandlerTask.OrTimeout();
2053+
}
2054+
}
2055+
2056+
[Fact]
2057+
public async Task StreamInvocationsBlockOtherInvocationsUntilTheyStartStreaming()
2058+
{
2059+
var tcsService = new TcsService();
2060+
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder =>
2061+
{
2062+
builder.AddSingleton(tcsService);
2063+
builder.AddSingleton(typeof(IHubActivator<>), typeof(CustomHubActivator<>));
2064+
});
2065+
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<LongRunningHub>>();
2066+
2067+
// Because we use PipeScheduler.Inline the hub invocations will run inline until they wait, which happens inside the LongRunningMethod call
2068+
using (var client = new TestClient())
2069+
{
2070+
var connectionHandlerTask = await client.ConnectAsync(connectionHandler).OrTimeout();
2071+
2072+
// Long running hub invocation to test that other invocations will not run until it is completed
2073+
var streamInvocationId = await client.SendStreamInvocationAsync(nameof(LongRunningHub.LongRunningStream)).OrTimeout();
2074+
// Wait for the long running method to start
2075+
await tcsService.StartedMethod.Task.OrTimeout();
2076+
2077+
// Invoke another hub method which will wait for the first method to finish
2078+
await client.SendInvocationAsync(nameof(LongRunningHub.SimpleMethod), nonBlocking: false).OrTimeout();
2079+
// Both invocations should be waiting now
2080+
Assert.Null(client.TryRead());
2081+
2082+
// Release the long running hub method
2083+
tcsService.EndMethod.TrySetResult(null);
2084+
2085+
// simple hub method result
2086+
var result = await client.ReadAsync().OrTimeout();
2087+
2088+
var simpleCompletion = Assert.IsType<CompletionMessage>(result);
2089+
Assert.Equal(21L, simpleCompletion.Result);
2090+
2091+
var hubActivator = serviceProvider.GetService<IHubActivator<LongRunningHub>>() as CustomHubActivator<LongRunningHub>;
2092+
2093+
// OnConnectedAsync and SimpleMethod hubs have been disposed at this point
2094+
Assert.Equal(2, hubActivator.ReleaseCount);
2095+
2096+
await client.SendHubMessageAsync(new CancelInvocationMessage(streamInvocationId)).OrTimeout();
2097+
2098+
// Completion message for canceled Stream
2099+
await client.ReadAsync().OrTimeout();
2100+
2101+
// Stream method is now disposed
2102+
Assert.Equal(3, hubActivator.ReleaseCount);
2103+
2104+
// Shut down
2105+
client.Dispose();
2106+
2107+
await connectionHandlerTask.OrTimeout();
2108+
}
2109+
}
2110+
2111+
private class CustomHubActivator<THub> : IHubActivator<THub> where THub : Hub
2112+
{
2113+
public int ReleaseCount;
2114+
private IServiceProvider _serviceProvider;
2115+
2116+
public CustomHubActivator(IServiceProvider serviceProvider)
2117+
{
2118+
_serviceProvider = serviceProvider;
2119+
}
2120+
2121+
public THub Create()
2122+
{
2123+
return new DefaultHubActivator<THub>(_serviceProvider).Create();
2124+
}
2125+
2126+
public void Release(THub hub)
2127+
{
2128+
ReleaseCount++;
2129+
hub.Dispose();
2130+
}
2131+
}
2132+
19852133
public static IEnumerable<object[]> HubTypes()
19862134
{
19872135
yield return new[] { typeof(DynamicTestHub) };

0 commit comments

Comments
 (0)