Skip to content

Commit 941ca57

Browse files
committed
wip
1 parent fe6119a commit 941ca57

File tree

3 files changed

+66
-5
lines changed

3 files changed

+66
-5
lines changed

src/SignalR/server/Core/src/HubConnectionHandler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,11 +329,11 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection)
329329
input.AdvanceTo(buffer.Start, buffer.End);
330330
}
331331

332-
static async Task DispatchMessage(HubConnectionContext connection, HubDispatcher<THub> dispatcher, HubMessage message)
332+
static Task DispatchMessage(HubConnectionContext connection, HubDispatcher<THub> dispatcher, HubMessage message)
333333
{
334334
connection.StopClientTimeout();
335335
_ = ProcessTask(connection, dispatcher.DispatchMessageAsync(connection, message));
336-
await connection.ActiveInvocationLimit.WaitAsync();
336+
return connection.ActiveInvocationLimit.WaitAsync();
337337
}
338338

339339
static async Task ProcessTask(HubConnectionContext connection, Task task)

src/SignalR/server/Core/src/Internal/DefaultHubDispatcher.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,10 @@ await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection,
349349

350350
if (isStreamResponse)
351351
{
352+
// TODO: Need to sync this with CancelInvocationMessage
353+
cts = cts ?? CancellationTokenSource.CreateLinkedTokenSource(connection.ConnectionAborted);
354+
connection.ActiveRequestCancellationSources.TryAdd(hubMethodInvocationMessage.InvocationId, cts);
355+
352356
var result = await ExecuteHubMethod(methodExecutor, hub, arguments, connection, scope.ServiceProvider);
353357

354358
if (result == null)
@@ -359,8 +363,6 @@ await SendInvocationError(hubMethodInvocationMessage.InvocationId, connection,
359363
return;
360364
}
361365

362-
cts = cts ?? CancellationTokenSource.CreateLinkedTokenSource(connection.ConnectionAborted);
363-
connection.ActiveRequestCancellationSources.TryAdd(hubMethodInvocationMessage.InvocationId, cts);
364366
var enumerable = descriptor.FromReturnedStream(result, cts.Token);
365367

366368
Log.StreamingResult(_logger, hubMethodInvocationMessage.InvocationId, methodExecutor);

src/SignalR/server/SignalR/test/HubConnectionHandlerTests.cs

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2976,6 +2976,8 @@ public async Task HubMethodInvokeCountsTowardsClientTimeoutIfParallelNotMaxed()
29762976

29772977
// Connection is closed
29782978
await connectionHandlerTask.OrTimeout();
2979+
2980+
tcsService.EndMethod.SetResult(null);
29792981
}
29802982
}
29812983
}
@@ -3223,8 +3225,8 @@ public async Task PendingInvocationUnblockedWhenBlockingMethodCompletesWithParal
32233225
await tcsService.StartedMethod.Task.OrTimeout();
32243226
// Grab the tcs before resetting to use in the second long running method
32253227
var endTcs = tcsService.EndMethod;
3226-
32273228
tcsService.Reset();
3229+
32283230
// Long running hub invocation to test that other invocations will not run until it is completed
32293231
await client.SendInvocationAsync(nameof(LongRunningHub.LongRunningMethod), nonBlocking: false).OrTimeout();
32303232
// Wait for the long running method to start
@@ -3325,6 +3327,63 @@ public async Task StreamInvocationsBlockOtherInvocationsUntilTheyStartStreaming(
33253327
}
33263328
}
33273329

3330+
[Fact]
3331+
public async Task StreamInvocationsDoNotBlockOtherInvocationsWithParallelInvokes()
3332+
{
3333+
using (StartVerifiableLog())
3334+
{
3335+
var tcsService = new TcsService();
3336+
var serviceProvider = HubConnectionHandlerTestUtils.CreateServiceProvider(builder =>
3337+
{
3338+
builder.AddSingleton(tcsService);
3339+
builder.AddSingleton(typeof(IHubActivator<>), typeof(CustomHubActivator<>));
3340+
3341+
builder.AddSignalR(options =>
3342+
{
3343+
options.MaxParallelInvocationsPerClient = 2;
3344+
});
3345+
}, LoggerFactory);
3346+
var connectionHandler = serviceProvider.GetService<HubConnectionHandler<LongRunningHub>>();
3347+
3348+
// Because we use PipeScheduler.Inline the hub invocations will run inline until they wait, which happens inside the LongRunningMethod call
3349+
using (var client = new TestClient())
3350+
{
3351+
var connectionHandlerTask = await client.ConnectAsync(connectionHandler).OrTimeout();
3352+
3353+
// Long running hub invocation to test that other invocations will not run until it is completed
3354+
var streamInvocationId = await client.SendStreamInvocationAsync(nameof(LongRunningHub.LongRunningStream), null).OrTimeout();
3355+
// Wait for the long running method to start
3356+
await tcsService.StartedMethod.Task.OrTimeout();
3357+
3358+
// Invoke another hub method which will wait for the first method to finish
3359+
await client.SendInvocationAsync(nameof(LongRunningHub.SimpleMethod), nonBlocking: false).OrTimeout();
3360+
3361+
// simple hub method result
3362+
var result = await client.ReadAsync().OrTimeout();
3363+
var simpleCompletion = Assert.IsType<CompletionMessage>(result);
3364+
Assert.Equal(21L, simpleCompletion.Result);
3365+
3366+
// Release the long running hub method
3367+
tcsService.EndMethod.TrySetResult(null);
3368+
3369+
var hubActivator = serviceProvider.GetService<IHubActivator<LongRunningHub>>() as CustomHubActivator<LongRunningHub>;
3370+
3371+
await client.SendHubMessageAsync(new CancelInvocationMessage(streamInvocationId)).OrTimeout();
3372+
3373+
// Completion message for canceled Stream
3374+
await client.ReadAsync().OrTimeout();
3375+
3376+
// Shut down
3377+
client.Dispose();
3378+
3379+
await connectionHandlerTask.OrTimeout();
3380+
3381+
// OnConnectedAsync, SimpleMethod, LongRunningStream, OnDisconnectedAsync
3382+
Assert.Equal(4, hubActivator.ReleaseCount);
3383+
}
3384+
}
3385+
}
3386+
33283387
[Fact]
33293388
public async Task ServerSendsCloseWithErrorWhenConnectionClosedWithPartialMessage()
33303389
{

0 commit comments

Comments
 (0)