Skip to content

Commit fe6119a

Browse files
committed
layer
1 parent 1fa4a91 commit fe6119a

File tree

3 files changed

+107
-32
lines changed

3 files changed

+107
-32
lines changed

src/SignalR/server/Core/src/HubConnectionHandler.cs

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection)
262262
while (protocol.TryParseMessage(ref buffer, binder, out var message))
263263
{
264264
messageReceived = true;
265-
await _dispatcher.DispatchMessageAsync(connection, message);
265+
await DispatchMessage(connection, _dispatcher, message);
266266
}
267267

268268
if (messageReceived)
@@ -289,8 +289,7 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection)
289289
if (protocol.TryParseMessage(ref segment, binder, out var message))
290290
{
291291
messageReceived = true;
292-
293-
await _dispatcher.DispatchMessageAsync(connection, message);
292+
await DispatchMessage(connection, _dispatcher, message);
294293
}
295294
else if (overLength)
296295
{
@@ -329,6 +328,25 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection)
329328
// before yielding the read again.
330329
input.AdvanceTo(buffer.Start, buffer.End);
331330
}
331+
332+
static async Task DispatchMessage(HubConnectionContext connection, HubDispatcher<THub> dispatcher, HubMessage message)
333+
{
334+
connection.StopClientTimeout();
335+
_ = ProcessTask(connection, dispatcher.DispatchMessageAsync(connection, message));
336+
await connection.ActiveInvocationLimit.WaitAsync();
337+
}
338+
339+
static async Task ProcessTask(HubConnectionContext connection, Task task)
340+
{
341+
try
342+
{
343+
await task;
344+
}
345+
finally
346+
{
347+
connection.ActiveInvocationLimit.Release();
348+
}
349+
}
332350
}
333351
}
334352

src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs

Lines changed: 9 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -142,31 +142,25 @@ public override async Task OnDisconnectedAsync(HubConnectionContext connection,
142142

143143
public override Task DispatchMessageAsync(HubConnectionContext connection, HubMessage hubMessage)
144144
{
145-
connection.StopClientTimeout();
146-
147145
// Messages are dispatched sequentially and will stop other messages from being processed until they complete.
148146
// Streaming methods will run sequentially until they start streaming, then they will fire-and-forget allowing other messages to run.
149147
// With parallel invokes enabled, messages run sequentially until they go async and then the next message will be allowed to start running.
150148

151149
switch (hubMessage)
152150
{
153151
case InvocationBindingFailureMessage bindingFailureMessage:
154-
_ = ProcessTask(connection, ProcessInvocationBindingFailure(connection, bindingFailureMessage));
155-
break;
152+
return ProcessInvocationBindingFailure(connection, bindingFailureMessage);
156153

157154
case StreamBindingFailureMessage bindingFailureMessage:
158-
_ = ProcessTask(connection, ProcessStreamBindingFailure(connection, bindingFailureMessage));
159-
break;
155+
return ProcessStreamBindingFailure(connection, bindingFailureMessage);
160156

161157
case InvocationMessage invocationMessage:
162158
Log.ReceivedHubInvocation(_logger, invocationMessage);
163-
_ = ProcessTask(connection, ProcessInvocation(connection, invocationMessage, isStreamResponse: false));
164-
break;
159+
return ProcessInvocation(connection, invocationMessage, isStreamResponse: false);
165160

166161
case StreamInvocationMessage streamInvocationMessage:
167162
Log.ReceivedStreamHubInvocation(_logger, streamInvocationMessage);
168-
_ = ProcessTask(connection, ProcessInvocation(connection, streamInvocationMessage, isStreamResponse: true));
169-
break;
163+
return ProcessInvocation(connection, streamInvocationMessage, isStreamResponse: true);
170164

171165
case CancelInvocationMessage cancelInvocationMessage:
172166
// Check if there is an associated active stream and cancel it if it exists.
@@ -181,15 +175,14 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe
181175
// Stream can be canceled on the server while client is canceling stream.
182176
Log.UnexpectedCancel(_logger);
183177
}
184-
return Task.CompletedTask;
178+
break;
185179

186180
case PingMessage _:
187181
connection.StartClientTimeout();
188-
return Task.CompletedTask;
182+
break;
189183

190184
case StreamItemMessage streamItem:
191-
_ = ProcessTask(connection, ProcessStreamItem(connection, streamItem));
192-
break;
185+
return ProcessStreamItem(connection, streamItem);
193186

194187
case CompletionMessage streamCompleteMessage:
195188
// closes channels, removes from Lookup dict
@@ -202,27 +195,15 @@ public override Task DispatchMessageAsync(HubConnectionContext connection, HubMe
202195
{
203196
Log.UnexpectedStreamCompletion(_logger);
204197
}
205-
return Task.CompletedTask;
198+
break;
206199

207200
// Other kind of message we weren't expecting
208201
default:
209202
Log.UnsupportedMessageReceived(_logger, hubMessage.GetType().FullName);
210203
throw new NotSupportedException($"Received unsupported message: {hubMessage}");
211204
}
212205

213-
return connection.ActiveInvocationLimit.WaitAsync();
214-
215-
static async Task ProcessTask(HubConnectionContext connection, Task task)
216-
{
217-
try
218-
{
219-
await task;
220-
}
221-
finally
222-
{
223-
connection.ActiveInvocationLimit.Release();
224-
}
225-
}
206+
return Task.CompletedTask;
226207
}
227208

228209
private Task ProcessInvocationBindingFailure(HubConnectionContext connection, InvocationBindingFailureMessage bindingFailureMessage)

src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3057,7 +3057,13 @@ public async Task StreamingInvocationsDoNotBlockOtherInvocations()
30573057
{
30583058
using (StartVerifiableLog())
30593059
{
3060-
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(null, LoggerFactory);
3060+
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(services =>
3061+
{
3062+
services.AddSignalR(options =>
3063+
{
3064+
options.MaxParallelInvocationsPerClient = 1;
3065+
});
3066+
}, LoggerFactory);
30613067
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<StreamingHub>>();
30623068

30633069
using (var client = new TestClient(new NewtonsoftJsonHubProtocol()))
@@ -3189,6 +3195,76 @@ public async Task InvocationsCanRunOutOfOrderWithParallelism()
31893195
}
31903196
}
31913197

3198+
[Fact]
3199+
public async Task PendingInvocationUnblockedWhenBlockingMethodCompletesWithParallelism()
3200+
{
3201+
using (StartVerifiableLog())
3202+
{
3203+
var tcsService = new TcsService();
3204+
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder =>
3205+
{
3206+
builder.AddSingleton(tcsService);
3207+
3208+
builder.AddSignalR(options =>
3209+
{
3210+
options.MaxParallelInvocationsPerClient = 2;
3211+
});
3212+
}, LoggerFactory);
3213+
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<LongRunningHub>>();
3214+
3215+
// Because we use PipeScheduler.Inline the hub invocations will run inline until they wait, which happens inside the LongRunningMethod call
3216+
using (var client = new TestClient())
3217+
{
3218+
var connectionHandlerTask = await client.ConnectAsync(connectionHandler).OrTimeout();
3219+
3220+
// Long running hub invocation to test that other invocations will not run until it is completed
3221+
await client.SendInvocationAsync(nameof(LongRunningHub.LongRunningMethod), nonBlocking: false).OrTimeout();
3222+
// Wait for the long running method to start
3223+
await tcsService.StartedMethod.Task.OrTimeout();
3224+
// Grab the tcs before resetting to use in the second long running method
3225+
var endTcs = tcsService.EndMethod;
3226+
3227+
tcsService.Reset();
3228+
// Long running hub invocation to test that other invocations will not run until it is completed
3229+
await client.SendInvocationAsync(nameof(LongRunningHub.LongRunningMethod), nonBlocking: false).OrTimeout();
3230+
// Wait for the long running method to start
3231+
await tcsService.StartedMethod.Task.OrTimeout();
3232+
3233+
// Invoke another hub method which will wait for the first method to finish
3234+
await client.SendInvocationAsync(nameof(LongRunningHub.SimpleMethod), nonBlocking: false).OrTimeout();
3235+
// Both invocations should be waiting now
3236+
Assert.Null(client.TryRead());
3237+
3238+
// Release the second long running hub method
3239+
tcsService.EndMethod.TrySetResult(null);
3240+
3241+
// Long running hub method result
3242+
var firstResult = await client.ReadAsync().OrTimeout();
3243+
3244+
var longRunningCompletion = Assert.IsType<CompletionMessage>(firstResult);
3245+
Assert.Equal(12L, longRunningCompletion.Result);
3246+
3247+
// simple hub method result
3248+
var secondResult = await client.ReadAsync().OrTimeout();
3249+
3250+
var simpleCompletion = Assert.IsType<CompletionMessage>(secondResult);
3251+
Assert.Equal(21L, simpleCompletion.Result);
3252+
3253+
// Release the first long running hub method
3254+
endTcs.TrySetResult(null);
3255+
3256+
firstResult = await client.ReadAsync().OrTimeout();
3257+
longRunningCompletion = Assert.IsType<CompletionMessage>(firstResult);
3258+
Assert.Equal(12L, longRunningCompletion.Result);
3259+
3260+
// Shut down
3261+
client.Dispose();
3262+
3263+
await connectionHandlerTask.OrTimeout();
3264+
}
3265+
}
3266+
}
3267+
31923268
[Fact]
31933269
public async Task StreamInvocationsBlockOtherInvocationsUntilTheyStartStreaming()
31943270
{

0 commit comments

Comments
 (0)