Skip to content

Commit 6ff17c8

Browse files
authored
Handle cancellation token during the consumer close (#339)
* During the handle deliver,y the consumer could receive a Token cancellation In this commit, the consumer handles it with a log and exit. It will avoid propagating the error and close the TCP connection * Add a lock around the IsOpen() function to make it thread-safe. In normal situations, it does not matter. It is useful when a consumer is created and destroyed in a short time * Handle the Subscribe error. In case there is an error during the init. The error will be raised to the caller, but the pool must be consistent * Wait until the subscription is finished before the close and dispose call. If a disposed is called just after the creation it waits. * RawConsumer: Add check if the consumer is open before dispatch * Client: Add nextEntityId to have always a sequential ids. It avoids using the same ids during the recycle * ConnectionPool.FindNextValidId/2:Change the way to give the ids. Given an `nextid` in input, the function will always give the next value --------- Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent cf8c927 commit 6ff17c8

File tree

12 files changed

+258
-74
lines changed

12 files changed

+258
-74
lines changed

RabbitMQ.Stream.Client/AbstractEntity.cs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
namespace RabbitMQ.Stream.Client
1111
{
12-
1312
public abstract record EntityCommonConfig
1413
{
1514
internal ConnectionsPool Pool { get; set; }
@@ -19,7 +18,8 @@ internal enum EntityStatus
1918
{
2019
Open,
2120
Closed,
22-
Disposed
21+
Disposed,
22+
Initializing
2323
}
2424

2525
public interface IClosable
@@ -65,6 +65,8 @@ private void MaybeCancelToken()
6565
/// <returns></returns>
6666
protected abstract Task<ResponseCode> DeleteEntityFromTheServer(bool ignoreIfAlreadyDeleted = false);
6767

68+
// private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1);
69+
6870
/// <summary>
6971
/// Internal close method. It is called by the public Close method.
7072
/// Set the status to closed and remove the producer or consumer from the server ( if it is not already removed )
@@ -75,13 +77,13 @@ private void MaybeCancelToken()
7577
/// <returns></returns>
7678
protected async Task<ResponseCode> Shutdown(EntityCommonConfig config, bool ignoreIfAlreadyDeleted = false)
7779
{
78-
MaybeCancelToken();
79-
8080
if (!IsOpen()) // the client is already closed
8181
{
8282
return ResponseCode.Ok;
8383
}
8484

85+
MaybeCancelToken();
86+
8587
_status = EntityStatus.Closed;
8688
var result = await DeleteEntityFromTheServer(ignoreIfAlreadyDeleted).ConfigureAwait(false);
8789

@@ -120,7 +122,8 @@ protected void Dispose(bool disposing)
120122
}
121123
catch (Exception e)
122124
{
123-
Logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", DumpEntityConfiguration(), e.Message);
125+
Logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", DumpEntityConfiguration(),
126+
e.Message);
124127
}
125128
finally
126129
{

RabbitMQ.Stream.Client/Client.cs

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ public static async Task<Client> Create(ClientParameters parameters, ILogger log
207207
client.connection = await Connection
208208
.Create(parameters.Endpoint, client.HandleIncoming, client.HandleClosed, parameters.Ssl, logger)
209209
.ConfigureAwait(false);
210-
210+
client.connection.ClientId = client.ClientId;
211211
// exchange properties
212212
var peerPropertiesResponse = await client.Request<PeerPropertiesRequest, PeerPropertiesResponse>(corr =>
213213
new PeerPropertiesRequest(corr, parameters.Properties)).ConfigureAwait(false);
@@ -307,7 +307,7 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
307307
Action<(ulong, ResponseCode)[]> errorCallback, ConnectionsPool pool = null)
308308
{
309309
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
310-
var publisherId = ConnectionsPool.FindNextValidId(publishers.Keys.ToList());
310+
var publisherId = ConnectionsPool.FindNextValidId(publishers.Keys.ToList(), IncrementEntityId());
311311
DeclarePublisherResponse response;
312312

313313
try
@@ -321,7 +321,7 @@ public ValueTask<bool> Publish<T>(T msg) where T : struct, ICommand
321321
_poolSemaphore.Release();
322322
}
323323

324-
if (response.ResponseCode == ResponseCode.Ok || pool == null)
324+
if (response.ResponseCode == ResponseCode.Ok)
325325
return (publisherId, response);
326326

327327
// if the response code is not ok we need to remove the subscription
@@ -358,22 +358,40 @@ await Request<DeletePublisherRequest, DeletePublisherResponse>(corr =>
358358
public async Task<(byte, SubscribeResponse)> Subscribe(string stream, IOffsetType offsetType,
359359
ushort initialCredit,
360360
Dictionary<string, string> properties, Func<Deliver, Task> deliverHandler,
361-
Func<bool, Task<IOffsetType>> consumerUpdateHandler = null)
361+
Func<bool, Task<IOffsetType>> consumerUpdateHandler = null, ConnectionsPool pool = null)
362362
{
363-
return await Subscribe(new RawConsumerConfig(stream) { OffsetSpec = offsetType },
363+
return await Subscribe(new RawConsumerConfig(stream) { OffsetSpec = offsetType, Pool = pool },
364364
initialCredit,
365365
properties,
366366
deliverHandler,
367367
consumerUpdateHandler).ConfigureAwait(false);
368368
}
369369

370+
private byte _nextEntityId = 0;
371+
372+
// the entity id is a byte so we need to increment it and reset it when it reaches the max value
373+
// to avoid to use always the same ids when producers and consumers are created
374+
// so even there is a connection with one producer or consumer we need to increment the id
375+
private byte IncrementEntityId()
376+
{
377+
lock (Obj)
378+
{
379+
var current = _nextEntityId;
380+
_nextEntityId++;
381+
if (_nextEntityId != byte.MaxValue)
382+
return current;
383+
_nextEntityId = 0;
384+
return _nextEntityId;
385+
}
386+
}
387+
370388
public async Task<(byte, SubscribeResponse)> Subscribe(RawConsumerConfig config,
371389
ushort initialCredit,
372390
Dictionary<string, string> properties, Func<Deliver, Task> deliverHandler,
373391
Func<bool, Task<IOffsetType>> consumerUpdateHandler)
374392
{
375393
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
376-
var subscriptionId = ConnectionsPool.FindNextValidId(consumers.Keys.ToList());
394+
var subscriptionId = ConnectionsPool.FindNextValidId(consumers.Keys.ToList(), IncrementEntityId());
377395
SubscribeResponse response;
378396
try
379397
{
@@ -788,7 +806,6 @@ internal async Task<CloseResponse> MaybeClose(string reason, string stream, Conn
788806
_logger.LogInformation("Close connection for the {ClientId}", ClientId);
789807
// the client can be closed in an unexpected way so we need to remove it from the pool
790808
// so you will find pool.remove(ClientId) also to the disconnect event
791-
// pool.remove(ClientId) is a duplicate call here but it is ok. The pool is idempotent
792809
pool.Remove(ClientId);
793810
await Close(reason).ConfigureAwait(false);
794811
}

RabbitMQ.Stream.Client/Connection.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public class Connection : IDisposable
3333
private CancellationToken Token => _cancelTokenSource.Token;
3434

3535
internal int NumFrames => numFrames;
36-
36+
internal string ClientId { get; set; }
3737
public bool IsClosed => isClosed;
3838

3939
private static System.IO.Stream MaybeTcpUpgrade(NetworkStream networkStream, SslOption sslOption)
@@ -191,6 +191,8 @@ private async Task ProcessIncomingFrames()
191191
finally
192192
{
193193
isClosed = true;
194+
_logger?.LogDebug("TCP Connection Closed ClientId: {ClientId} is IsCancellationRequested {Token} ",
195+
ClientId, Token.IsCancellationRequested);
194196
// Mark the PipeReader as complete
195197
await reader.CompleteAsync(caught).ConfigureAwait(false);
196198
var t = closedCallback?.Invoke("TCP Connection Closed")!;

RabbitMQ.Stream.Client/ConnectionsPool.cs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -100,30 +100,35 @@ public class ConnectionsPool
100100
{
101101
private static readonly object s_lock = new();
102102

103-
internal static byte FindNextValidId(List<byte> ids)
103+
internal static byte FindNextValidId(List<byte> ids, byte nextId = 0)
104104
{
105105
lock (s_lock)
106106
{
107-
if (ids.Count == 0)
108-
{
109-
return 0;
110-
}
111-
112107
// // we start with the recycle when we reach the max value
113108
// // in this way we can avoid to recycle the same ids in a short time
114109
ids.Sort();
115-
if (ids[^1] != byte.MaxValue)
116-
return (byte)(ids[^1] + 1);
110+
var l = ids.Where(b => b >= nextId).ToList();
111+
l.Sort();
112+
if (l.Count == 0)
113+
{
114+
// not necessary to start from 0 because the ids are recycled
115+
// nextid is passed as parameter to avoid to start from 0
116+
// see client:IncrementEntityId/0
117+
return nextId;
118+
}
119+
120+
if (l[^1] != byte.MaxValue)
121+
return (byte)(l[^1] + 1);
117122

118123
for (var i = 0; i < ids.Count - 1; i++)
119124
{
120-
if (ids[i + 1] - ids[i] > 1)
125+
if (l[i + 1] - l[i] > 1)
121126
{
122-
return (byte)(ids[i] + 1);
127+
return (byte)(l[i] + 1);
123128
}
124129
}
125130

126-
return (byte)(ids[^1] + 1);
131+
return (byte)(l[^1] + 1);
127132
}
128133
}
129134

RabbitMQ.Stream.Client/PublicAPI.Shipped.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,6 @@ RabbitMQ.Stream.Client.Client.QueryPublisherSequence(string publisherRef, string
186186
RabbitMQ.Stream.Client.Client.StoreOffset(string reference, string stream, ulong offsetValue) -> System.Threading.Tasks.ValueTask<bool>
187187
RabbitMQ.Stream.Client.Client.StreamExists(string stream) -> System.Threading.Tasks.Task<bool>
188188
RabbitMQ.Stream.Client.Client.Subscribe(RabbitMQ.Stream.Client.RawConsumerConfig config, ushort initialCredit, System.Collections.Generic.Dictionary<string, string> properties, System.Func<RabbitMQ.Stream.Client.Deliver, System.Threading.Tasks.Task> deliverHandler, System.Func<bool, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IOffsetType>> consumerUpdateHandler) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)>
189-
RabbitMQ.Stream.Client.Client.Subscribe(string stream, RabbitMQ.Stream.Client.IOffsetType offsetType, ushort initialCredit, System.Collections.Generic.Dictionary<string, string> properties, System.Func<RabbitMQ.Stream.Client.Deliver, System.Threading.Tasks.Task> deliverHandler, System.Func<bool, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IOffsetType>> consumerUpdateHandler = null) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)>
190189
RabbitMQ.Stream.Client.ClientParameters
191190
RabbitMQ.Stream.Client.ClientParameters.AddressResolver.get -> RabbitMQ.Stream.Client.AddressResolver
192191
RabbitMQ.Stream.Client.ClientParameters.AddressResolver.set -> void

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ RabbitMQ.Stream.Client.Client.DeletePublisher(byte publisherId, bool ignoreIfAlr
3333
RabbitMQ.Stream.Client.Client.ExchangeVersions() -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.CommandVersionsResponse>
3434
RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.RouteQueryResponse>
3535
RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask<RabbitMQ.Stream.Client.StreamStatsResponse>
36+
RabbitMQ.Stream.Client.Client.Subscribe(string stream, RabbitMQ.Stream.Client.IOffsetType offsetType, ushort initialCredit, System.Collections.Generic.Dictionary<string, string> properties, System.Func<RabbitMQ.Stream.Client.Deliver, System.Threading.Tasks.Task> deliverHandler, System.Func<bool, System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IOffsetType>> consumerUpdateHandler = null, RabbitMQ.Stream.Client.ConnectionsPool pool = null) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)>
3637
RabbitMQ.Stream.Client.Client.Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.UnsubscribeResponse>
3738
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
3839
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.set -> void

0 commit comments

Comments
 (0)