Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public void GlobalSetup()
var contextOptions = new HubConnectionContextOptions()
{
KeepAliveInterval = TimeSpan.Zero,
StreamBufferCapacity = 10,
};
_connectionContext = new NoErrorHubConnectionContext(connection, contextOptions, NullLoggerFactory.Instance);

Expand Down
9 changes: 9 additions & 0 deletions src/SignalR/server/Core/src/HubConnectionContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ public HubConnectionContext(ConnectionContext connectionContext, HubConnectionCo

_systemClock = contextOptions.SystemClock ?? new SystemClock();
_lastSendTimeStamp = _systemClock.UtcNowTicks;

// We'll be avoiding using the semaphore when the limit is set to 1, so no need to allocate it
var maxInvokeLimit = contextOptions.MaximumParallelInvocations;
if (maxInvokeLimit != 1)
{
ActiveInvocationLimit = new SemaphoreSlim(maxInvokeLimit, maxInvokeLimit);
}
}

internal StreamTracker StreamTracker
Expand All @@ -93,6 +100,8 @@ internal StreamTracker StreamTracker

internal Exception? CloseException { get; private set; }

internal SemaphoreSlim? ActiveInvocationLimit { get; }

/// <summary>
/// Gets a <see cref="CancellationToken"/> that notifies when the connection is aborted.
/// </summary>
Expand Down
5 changes: 5 additions & 0 deletions src/SignalR/server/Core/src/HubConnectionContextOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,10 @@ public class HubConnectionContextOptions
public long? MaximumReceiveMessageSize { get; set; }

internal ISystemClock SystemClock { get; set; } = default!;

/// <summary>
/// Gets or sets the maximum parallel hub method invocations.
/// </summary>
public int MaximumParallelInvocations { get; set; } = 1;
}
}
13 changes: 9 additions & 4 deletions src/SignalR/server/Core/src/HubConnectionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Connections;
using Microsoft.AspNetCore.Internal;
Expand All @@ -31,6 +32,7 @@ public class HubConnectionHandler<THub> : ConnectionHandler where THub : Hub
private readonly HubDispatcher<THub> _dispatcher;
private readonly bool _enableDetailedErrors;
private readonly long? _maximumMessageSize;
private readonly int _maxParallelInvokes;

// Internal for testing
internal ISystemClock SystemClock { get; set; } = new SystemClock();
Expand Down Expand Up @@ -70,6 +72,7 @@ IServiceScopeFactory serviceScopeFactory
{
_maximumMessageSize = _hubOptions.MaximumReceiveMessageSize;
_enableDetailedErrors = _hubOptions.EnableDetailedErrors ?? _enableDetailedErrors;
_maxParallelInvokes = _hubOptions.MaximumParallelInvocationsPerClient;

if (_hubOptions.HubFilters != null)
{
Expand All @@ -81,6 +84,7 @@ IServiceScopeFactory serviceScopeFactory
{
_maximumMessageSize = _globalHubOptions.MaximumReceiveMessageSize;
_enableDetailedErrors = _globalHubOptions.EnableDetailedErrors ?? _enableDetailedErrors;
_maxParallelInvokes = _globalHubOptions.MaximumParallelInvocationsPerClient;

if (_globalHubOptions.HubFilters != null)
{
Expand Down Expand Up @@ -118,6 +122,7 @@ public override async Task OnConnectedAsync(ConnectionContext connection)
StreamBufferCapacity = _hubOptions.StreamBufferCapacity ?? _globalHubOptions.StreamBufferCapacity ?? HubOptionsSetup.DefaultStreamBufferCapacity,
MaximumReceiveMessageSize = _maximumMessageSize,
SystemClock = SystemClock,
MaximumParallelInvocations = _maxParallelInvokes,
};

Log.ConnectedStarting(_logger);
Expand Down Expand Up @@ -237,7 +242,6 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection)
var protocol = connection.Protocol;
connection.BeginClientTimeout();


var binder = new HubConnectionBinder<THub>(_dispatcher, connection);

while (true)
Expand All @@ -260,8 +264,9 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection)
{
while (protocol.TryParseMessage(ref buffer, binder, out var message))
{
messageReceived = true;
connection.StopClientTimeout();
// This lets us know the timeout has stopped and we need to re-enable it after dispatching the message
messageReceived = true;
await _dispatcher.DispatchMessageAsync(connection, message);
}

Expand All @@ -288,9 +293,9 @@ private async Task DispatchMessagesAsync(HubConnectionContext connection)

if (protocol.TryParseMessage(ref segment, binder, out var message))
{
messageReceived = true;
connection.StopClientTimeout();

// This lets us know the timeout has stopped and we need to re-enable it after dispatching the message
messageReceived = true;
await _dispatcher.DispatchMessageAsync(connection, message);
}
else if (overLength)
Expand Down
20 changes: 20 additions & 0 deletions src/SignalR/server/Core/src/HubOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ namespace Microsoft.AspNetCore.SignalR
/// </summary>
public class HubOptions
{
private int _maximumParallelInvocationsPerClient = 1;

// HandshakeTimeout and KeepAliveInterval are set to null here to help identify when
// local hub options have been set. Global default values are set in HubOptionsSetup.
// SupportedProtocols being null is the true default value, and it represents support
Expand Down Expand Up @@ -53,5 +55,23 @@ public class HubOptions
public int? StreamBufferCapacity { get; set; } = null;

internal List<IHubFilter>? HubFilters { get; set; }

/// <summary>
/// By default a client is only allowed to invoke a single Hub method at a time.
/// Changing this property will allow clients to invoke multiple methods at the same time before queueing.
/// </summary>
public int MaximumParallelInvocationsPerClient
{
get => _maximumParallelInvocationsPerClient;
set
{
if (value < 1)
{
throw new ArgumentOutOfRangeException(nameof(MaximumParallelInvocationsPerClient));
}

_maximumParallelInvocationsPerClient = value;
}
}
}
}
1 change: 1 addition & 0 deletions src/SignalR/server/Core/src/HubOptionsSetup`T.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public void Configure(HubOptions<THub> options)
options.EnableDetailedErrors = _hubOptions.EnableDetailedErrors;
options.MaximumReceiveMessageSize = _hubOptions.MaximumReceiveMessageSize;
options.StreamBufferCapacity = _hubOptions.StreamBufferCapacity;
options.MaximumParallelInvocationsPerClient = _hubOptions.MaximumParallelInvocationsPerClient;

options.UserHasSetValues = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ private static class Log
private static readonly Action<ILogger, string, Exception> _invalidHubParameters =
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(22, "InvalidHubParameters"), "Parameters to hub method '{HubMethod}' are incorrect.");

private static readonly Action<ILogger, string, Exception> _invocationIdInUse =
LoggerMessage.Define<string>(LogLevel.Debug, new EventId(23, "InvocationIdInUse"), "Invocation ID '{InvocationId}' is already in use.");

public static void ReceivedHubInvocation(ILogger logger, InvocationMessage invocationMessage)
{
_receivedHubInvocation(logger, invocationMessage, null);
Expand Down Expand Up @@ -188,6 +191,11 @@ public static void InvalidHubParameters(ILogger logger, string hubMethod, Except
{
_invalidHubParameters(logger, hubMethod, exception);
}

public static void InvocationIdInUse(ILogger logger, string InvocationId)
{
_invocationIdInUse(logger, InvocationId, null);
}
}
}
}
Loading