From 78b7e79f6e2b9fe7c278ca47674072d5e4b03448 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 20 Dec 2023 14:31:26 +0100 Subject: [PATCH 1/4] work in progress Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/RawConsumer.cs | 30 +++++++++++++++++++++------ 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index 2517234a..d5debfed 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -87,14 +87,14 @@ internal void Validate() switch (ConsumerFilter) { - case { PostFilter: null }: + case {PostFilter: null}: throw new ArgumentException("PostFilter must be provided when Filter is set"); - case { Values.Count: 0 }: + case {Values.Count: 0}: throw new ArgumentException("Values must be provided when Filter is set"); } } - internal bool IsFiltering => ConsumerFilter is { Values.Count: > 0 }; + internal bool IsFiltering => ConsumerFilter is {Values.Count: > 0}; // it is needed to be able to add the subscriptions arguments // see consumerProperties["super-stream"] = SuperStream; @@ -177,6 +177,7 @@ protected override string GetStream() { return _config.Stream; } + public async Task StoreOffset(ulong offset) { await _client.StoreOffset(_config.Reference, _config.Stream, offset).ConfigureAwait(false); @@ -528,7 +529,8 @@ private async Task Init() { Logger?.LogError( "CRC32 does not match, server crc: {Crc}, local crc: {CrcCalculated}, {EntityInfo}, " + - "Chunk Consumed {ChunkConsumed}", deliver.Chunk.Crc, crcCalculated, DumpEntityConfiguration(), + "Chunk Consumed {ChunkConsumed}", deliver.Chunk.Crc, crcCalculated, + DumpEntityConfiguration(), chunkConsumed); throw new CrcException( @@ -537,7 +539,20 @@ private async Task Init() } } - await _chunksBuffer.Writer.WriteAsync(deliver.Chunk, Token).ConfigureAwait(false); + try + { + await _chunksBuffer.Writer.WriteAsync(deliver.Chunk, Token).ConfigureAwait(false); + } + catch (OperationCanceledException) + { + // The consumer is closing from the user but some chunks are still in the buffer + // simply skip the chunk since the Token.IsCancellationRequested is true + // the catch is needed to avoid to propagate the exception to the socket thread. + Logger?.LogWarning( + "OperationCanceledException. {EntityInfo} has been closed while consuming messages. " + + "Token.IsCancellationRequested: {IsCancellationRequested}", + DumpEntityConfiguration(), Token.IsCancellationRequested); + } }, async promotedAsActive => { if (_config.ConsumerUpdateListener != null) @@ -628,11 +643,14 @@ protected override async Task DeleteEntityFromTheServer(bool ignor } return ResponseCode.Ok; - } public override async Task Close() { + // when the consumer is closed we must be sure that the + // the subscription is completed to avoid problems with the connection + // It could happen when the closing is called just after the creation + await _completeSubscription.Task.ConfigureAwait(false); return await Shutdown(_config).ConfigureAwait(false); } From bb5af4cb01550cc636b4fac1c8d4001508ddcc1d Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 21 Dec 2023 12:08:07 +0100 Subject: [PATCH 2/4] handle the cancellation token * during the handle deliver the consumer could receive a Token cancellation In this commit the consumer handle it with a log and exit. It will avoid to propagate the error and close the TCP connection * Add a lock around the IsOpen() function to make it thread-safe. In normal situations in does not matter. It is useful when a consumer is created and destroyed in a short time * Handle the Subscribe error. In case there is an error during the init. The error will be raised to the caller but the pool must be consistent Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/AbstractEntity.cs | 46 +++++++----- RabbitMQ.Stream.Client/Client.cs | 52 +++++++------ RabbitMQ.Stream.Client/RawConsumer.cs | 73 ++++++++++--------- RabbitMQ.Stream.Client/Reliable/Consumer.cs | 3 +- .../Reliable/ReliableBase.cs | 21 ++++-- RabbitMQ.Stream.Client/Subscribe.cs | 2 +- 6 files changed, 115 insertions(+), 82 deletions(-) diff --git a/RabbitMQ.Stream.Client/AbstractEntity.cs b/RabbitMQ.Stream.Client/AbstractEntity.cs index e737c5f4..961ab449 100644 --- a/RabbitMQ.Stream.Client/AbstractEntity.cs +++ b/RabbitMQ.Stream.Client/AbstractEntity.cs @@ -9,7 +9,6 @@ namespace RabbitMQ.Stream.Client { - public abstract record EntityCommonConfig { internal ConnectionsPool Pool { get; set; } @@ -65,6 +64,8 @@ private void MaybeCancelToken() /// protected abstract Task DeleteEntityFromTheServer(bool ignoreIfAlreadyDeleted = false); + private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1); + /// /// Internal close method. It is called by the public Close method. /// Set the status to closed and remove the producer or consumer from the server ( if it is not already removed ) @@ -75,27 +76,35 @@ private void MaybeCancelToken() /// protected async Task Shutdown(EntityCommonConfig config, bool ignoreIfAlreadyDeleted = false) { - MaybeCancelToken(); - - if (!IsOpen()) // the client is already closed + await _writeLock.WaitAsync().ConfigureAwait(false); + try { - return ResponseCode.Ok; - } + MaybeCancelToken(); - _status = EntityStatus.Closed; - var result = await DeleteEntityFromTheServer(ignoreIfAlreadyDeleted).ConfigureAwait(false); + if (!IsOpen()) // the client is already closed + { + return ResponseCode.Ok; + } - if (_client is { IsClosed: true }) - { + _status = EntityStatus.Closed; + var result = await DeleteEntityFromTheServer(ignoreIfAlreadyDeleted).ConfigureAwait(false); + + if (_client is { IsClosed: true }) + { + return result; + } + + var closed = await _client.MaybeClose($"closing: {EntityId}", + GetStream(), config.Pool) + .ConfigureAwait(false); + ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-Entity: {EntityId}"); + Logger.LogDebug("{EntityInfo} is closed", DumpEntityConfiguration()); return result; } - - var closed = await _client.MaybeClose($"closing: {EntityId}", - GetStream(), config.Pool) - .ConfigureAwait(false); - ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-Entity: {EntityId}"); - Logger.LogDebug("{EntityInfo} is closed", DumpEntityConfiguration()); - return result; + finally + { + _writeLock.Release(); + } } protected void Dispose(bool disposing) @@ -120,7 +129,8 @@ protected void Dispose(bool disposing) } catch (Exception e) { - Logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", DumpEntityConfiguration(), e.Message); + Logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", DumpEntityConfiguration(), + e.Message); } finally { diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs index 6361370f..6286ead9 100644 --- a/RabbitMQ.Stream.Client/Client.cs +++ b/RabbitMQ.Stream.Client/Client.cs @@ -207,7 +207,6 @@ public static async Task Create(ClientParameters parameters, ILogger log client.connection = await Connection .Create(parameters.Endpoint, client.HandleIncoming, client.HandleClosed, parameters.Ssl, logger) .ConfigureAwait(false); - // exchange properties var peerPropertiesResponse = await client.Request(corr => new PeerPropertiesRequest(corr, parameters.Properties)).ConfigureAwait(false); @@ -374,31 +373,41 @@ await Request(corr => { await _poolSemaphore.WaitAsync().ConfigureAwait(false); var subscriptionId = ConnectionsPool.FindNextValidId(consumers.Keys.ToList()); - SubscribeResponse response; + try { - consumers.Add(subscriptionId, - new ConsumerEvents( - deliverHandler, - consumerUpdateHandler)); + SubscribeResponse response; + try + { + consumers.Add(subscriptionId, + new ConsumerEvents( + deliverHandler, + consumerUpdateHandler)); + + response = await Request(corr => + new SubscribeRequest(corr, subscriptionId, config.Stream, config.OffsetSpec, initialCredit, + properties)).ConfigureAwait(false); + } + finally + { + _poolSemaphore.Release(); + } + + if (response.ResponseCode == ResponseCode.Ok) + return (subscriptionId, response); - response = await Request(corr => - new SubscribeRequest(corr, subscriptionId, config.Stream, config.OffsetSpec, initialCredit, - properties)).ConfigureAwait(false); + ClientExceptions.MaybeThrowException(response.ResponseCode, $"Error while creating consumer for stream {config.Stream}"); } - finally + catch (Exception e) { - _poolSemaphore.Release(); + // if the response code is not ok we need to remove the subscription + // and close the connection if necessary. + consumers.Remove(subscriptionId); + await MaybeClose("Create Consumer Exception", config.Stream, config.Pool).ConfigureAwait(false); + throw new CreateConsumerException($"Error while creating consumer for stream {config.Stream}, error: {e.Message}"); } - if (response.ResponseCode == ResponseCode.Ok) - return (subscriptionId, response); - - // if the response code is not ok we need to remove the subscription - // and close the connection if necessary. - consumers.Remove(subscriptionId); - await MaybeClose("Create Consumer Exception", config.Stream, config.Pool).ConfigureAwait(false); - return (subscriptionId, response); + return (subscriptionId, new SubscribeResponse(subscriptionId, ResponseCode.InternalError)); } public async Task Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false) @@ -715,7 +724,7 @@ private async ValueTask SendHeartBeat() private void InternalClose() { _heartBeatHandler.Close(); - IsClosed = true; + // IsClosed = true; } private bool HasEntities() @@ -738,7 +747,6 @@ public async Task Close(string reason) return new CloseResponse(0, ResponseCode.Ok); } - InternalClose(); try { var result = @@ -761,6 +769,8 @@ public async Task Close(string reason) connection.Dispose(); } + InternalClose(); + return new CloseResponse(0, ResponseCode.Ok); } diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index d5debfed..82e6d775 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -87,14 +87,14 @@ internal void Validate() switch (ConsumerFilter) { - case {PostFilter: null}: + case { PostFilter: null }: throw new ArgumentException("PostFilter must be provided when Filter is set"); - case {Values.Count: 0}: + case { Values.Count: 0 }: throw new ArgumentException("Values must be provided when Filter is set"); } } - internal bool IsFiltering => ConsumerFilter is {Values.Count: > 0}; + internal bool IsFiltering => ConsumerFilter is { Values.Count: > 0 }; // it is needed to be able to add the subscriptions arguments // see consumerProperties["super-stream"] = SuperStream; @@ -406,6 +406,8 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) // // we request the credit before process the check to keep the network busy try { + if (Token.IsCancellationRequested) + break; await _client.Credit(EntityId, 1).ConfigureAwait(false); } catch (InvalidOperationException) @@ -504,43 +506,43 @@ private async Task Init() consumerProperties, async deliver => { - chunkConsumed++; - // Send the chunk to the _chunksBuffer - // in this way the chunks are processed in a separate thread - // this wont' block the socket thread - // introduced https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250 - if (Token.IsCancellationRequested) + try { - // the consumer is closing from the user but some chunks are still in the buffer - // simply skip the chunk - Logger?.LogTrace( - "CancellationToken requested. The {EntityInfo} " + - "The chunk won't be processed", - DumpEntityConfiguration()); - return; - } + chunkConsumed++; + // Send the chunk to the _chunksBuffer + // in this way the chunks are processed in a separate thread + // this wont' block the socket thread + // introduced https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250 + if (Token.IsCancellationRequested) + { + // the consumer is closing from the user but some chunks are still in the buffer + // simply skip the chunk + Logger?.LogTrace( + "CancellationToken requested. The {EntityInfo} " + + "The chunk won't be processed", + DumpEntityConfiguration()); + return; + } - if (_config.Crc32 is not null) - { - var crcCalculated = BitConverter.ToUInt32( - _config.Crc32.Hash(deliver.Chunk.Data.ToArray()) - ); - if (crcCalculated != deliver.Chunk.Crc) + if (_config.Crc32 is not null) { - Logger?.LogError( - "CRC32 does not match, server crc: {Crc}, local crc: {CrcCalculated}, {EntityInfo}, " + - "Chunk Consumed {ChunkConsumed}", deliver.Chunk.Crc, crcCalculated, - DumpEntityConfiguration(), - chunkConsumed); - - throw new CrcException( - $"CRC32 does not match, server crc: {deliver.Chunk.Crc}, local crc: {crcCalculated}, {DumpEntityConfiguration()}, " + - $"Chunk Consumed {chunkConsumed}"); + var crcCalculated = BitConverter.ToUInt32( + _config.Crc32.Hash(deliver.Chunk.Data.ToArray()) + ); + if (crcCalculated != deliver.Chunk.Crc) + { + Logger?.LogError( + "CRC32 does not match, server crc: {Crc}, local crc: {CrcCalculated}, {EntityInfo}, " + + "Chunk Consumed {ChunkConsumed}", deliver.Chunk.Crc, crcCalculated, + DumpEntityConfiguration(), + chunkConsumed); + + throw new CrcException( + $"CRC32 does not match, server crc: {deliver.Chunk.Crc}, local crc: {crcCalculated}, {DumpEntityConfiguration()}, " + + $"Chunk Consumed {chunkConsumed}"); + } } - } - try - { await _chunksBuffer.Writer.WriteAsync(deliver.Chunk, Token).ConfigureAwait(false); } catch (OperationCanceledException) @@ -575,6 +577,7 @@ private async Task Init() return _config.StoredOffsetSpec; } ).ConfigureAwait(false); + if (response.ResponseCode == ResponseCode.Ok) { _completeSubscription.SetResult(); diff --git a/RabbitMQ.Stream.Client/Reliable/Consumer.cs b/RabbitMQ.Stream.Client/Reliable/Consumer.cs index 01dbbfb6..527db9b3 100644 --- a/RabbitMQ.Stream.Client/Reliable/Consumer.cs +++ b/RabbitMQ.Stream.Client/Reliable/Consumer.cs @@ -211,7 +211,8 @@ public override async Task Close() public override string ToString() { - return $"Consumer reference: {_consumerConfig.Reference}, stream: {_consumerConfig.Stream} "; + return $"Consumer reference: {_consumerConfig.Reference}, stream: {_consumerConfig.Stream}, " + + $"client name: {_consumerConfig.ClientProvidedName} "; } public ConsumerInfo Info { get; } diff --git a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs index c71791c2..3ee4e0a5 100644 --- a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs +++ b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs @@ -3,7 +3,6 @@ // Copyright (c) 2007-2023 VMware, Inc. using System; - using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -37,7 +36,7 @@ protected ReliableConfig(StreamSystem streamSystem, string stream) public abstract class ReliableBase { protected readonly SemaphoreSlim SemaphoreSlim = new(1); - + private readonly object _lock = new(); protected bool _isOpen; protected bool _inReconnection; @@ -61,8 +60,11 @@ private async Task Init(bool boot, IReconnectStrategy reconnectStrategy) await SemaphoreSlim.WaitAsync().ConfigureAwait(false); try { - _isOpen = true; await CreateNewEntity(boot).ConfigureAwait(false); + lock (_lock) + { + _isOpen = true; + } } catch (Exception e) @@ -73,7 +75,11 @@ private async Task Init(bool boot, IReconnectStrategy reconnectStrategy) { // We consider the client as closed // since the exception is raised to the caller - _isOpen = false; + lock (_lock) + { + _isOpen = false; + } + throw; } } @@ -107,7 +113,7 @@ protected async Task TryToReconnect(IReconnectStrategy reconnectStrategy) _inReconnection = true; try { - switch (await reconnectStrategy.WhenDisconnected(ToString()).ConfigureAwait(false) && _isOpen) + switch (await reconnectStrategy.WhenDisconnected(ToString()).ConfigureAwait(false) && IsOpen()) { case true: BaseLogger.LogInformation("{Identity} is disconnected. Client will try reconnect", ToString()); @@ -195,6 +201,9 @@ private void LogException(Exception exception) public bool IsOpen() { - return _isOpen; + lock (_lock) + { + return _isOpen; + } } } diff --git a/RabbitMQ.Stream.Client/Subscribe.cs b/RabbitMQ.Stream.Client/Subscribe.cs index d195f48a..dffd2063 100644 --- a/RabbitMQ.Stream.Client/Subscribe.cs +++ b/RabbitMQ.Stream.Client/Subscribe.cs @@ -134,7 +134,7 @@ public int Write(Span span) private readonly uint correlationId; private readonly ResponseCode responseCode; - private SubscribeResponse(uint correlationId, ResponseCode responseCode) + internal SubscribeResponse(uint correlationId, ResponseCode responseCode) { this.correlationId = correlationId; this.responseCode = responseCode; From ae0ea1cc6f6e389780b5830abc763affebbe1c77 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 21 Dec 2023 12:32:20 +0100 Subject: [PATCH 3/4] restore internal close Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/Client.cs | 11 ++++++----- RabbitMQ.Stream.Client/Reliable/ReliableBase.cs | 3 ++- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs index 6286ead9..6ffe526c 100644 --- a/RabbitMQ.Stream.Client/Client.cs +++ b/RabbitMQ.Stream.Client/Client.cs @@ -396,7 +396,8 @@ await Request(corr => if (response.ResponseCode == ResponseCode.Ok) return (subscriptionId, response); - ClientExceptions.MaybeThrowException(response.ResponseCode, $"Error while creating consumer for stream {config.Stream}"); + ClientExceptions.MaybeThrowException(response.ResponseCode, + $"Error while creating consumer for stream {config.Stream}"); } catch (Exception e) { @@ -404,7 +405,8 @@ await Request(corr => // and close the connection if necessary. consumers.Remove(subscriptionId); await MaybeClose("Create Consumer Exception", config.Stream, config.Pool).ConfigureAwait(false); - throw new CreateConsumerException($"Error while creating consumer for stream {config.Stream}, error: {e.Message}"); + throw new CreateConsumerException( + $"Error while creating consumer for stream {config.Stream}, error: {e.Message}"); } return (subscriptionId, new SubscribeResponse(subscriptionId, ResponseCode.InternalError)); @@ -724,7 +726,7 @@ private async ValueTask SendHeartBeat() private void InternalClose() { _heartBeatHandler.Close(); - // IsClosed = true; + IsClosed = true; } private bool HasEntities() @@ -747,6 +749,7 @@ public async Task Close(string reason) return new CloseResponse(0, ResponseCode.Ok); } + InternalClose(); try { var result = @@ -769,8 +772,6 @@ public async Task Close(string reason) connection.Dispose(); } - InternalClose(); - return new CloseResponse(0, ResponseCode.Ok); } diff --git a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs index 3ee4e0a5..abb6fde7 100644 --- a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs +++ b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs @@ -60,11 +60,12 @@ private async Task Init(bool boot, IReconnectStrategy reconnectStrategy) await SemaphoreSlim.WaitAsync().ConfigureAwait(false); try { - await CreateNewEntity(boot).ConfigureAwait(false); lock (_lock) { _isOpen = true; } + + await CreateNewEntity(boot).ConfigureAwait(false); } catch (Exception e) From 9edd363c5b0cdfbac10535a3c2bea1334c985f72 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 22 Dec 2023 15:28:58 +0100 Subject: [PATCH 4/4] Wait until the subscription if finished before on the close and dispose call. If a disposed is called just after the creation it waits. * RawConsumer: Add check if the consumer is open before dispach * Client: Add nextEntityId to have always a sequential ids. It avoids to use the same ids during the recycle * ConnectionPool.FindNextValidId/2:Change the way to give the ids. Given an `nextid` in input the function will give always the next value Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/AbstractEntity.cs | 43 ++++---- RabbitMQ.Stream.Client/Client.cs | 78 ++++++++------- RabbitMQ.Stream.Client/Connection.cs | 4 +- RabbitMQ.Stream.Client/ConnectionsPool.cs | 27 ++--- RabbitMQ.Stream.Client/PublicAPI.Shipped.txt | 1 - .../PublicAPI.Unshipped.txt | 1 + RabbitMQ.Stream.Client/RawConsumer.cs | 26 ++++- Tests/ClientTests.cs | 22 +++-- Tests/ConnectionsPoolTests.cs | 99 +++++++++++++++++++ 9 files changed, 216 insertions(+), 85 deletions(-) diff --git a/RabbitMQ.Stream.Client/AbstractEntity.cs b/RabbitMQ.Stream.Client/AbstractEntity.cs index 961ab449..ea58bf0e 100644 --- a/RabbitMQ.Stream.Client/AbstractEntity.cs +++ b/RabbitMQ.Stream.Client/AbstractEntity.cs @@ -18,7 +18,8 @@ internal enum EntityStatus { Open, Closed, - Disposed + Disposed, + Initializing } public interface IClosable @@ -64,7 +65,7 @@ private void MaybeCancelToken() /// protected abstract Task DeleteEntityFromTheServer(bool ignoreIfAlreadyDeleted = false); - private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1); + // private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1); /// /// Internal close method. It is called by the public Close method. @@ -76,35 +77,27 @@ private void MaybeCancelToken() /// protected async Task Shutdown(EntityCommonConfig config, bool ignoreIfAlreadyDeleted = false) { - await _writeLock.WaitAsync().ConfigureAwait(false); - try + if (!IsOpen()) // the client is already closed { - MaybeCancelToken(); - - if (!IsOpen()) // the client is already closed - { - return ResponseCode.Ok; - } + return ResponseCode.Ok; + } - _status = EntityStatus.Closed; - var result = await DeleteEntityFromTheServer(ignoreIfAlreadyDeleted).ConfigureAwait(false); + MaybeCancelToken(); - if (_client is { IsClosed: true }) - { - return result; - } + _status = EntityStatus.Closed; + var result = await DeleteEntityFromTheServer(ignoreIfAlreadyDeleted).ConfigureAwait(false); - var closed = await _client.MaybeClose($"closing: {EntityId}", - GetStream(), config.Pool) - .ConfigureAwait(false); - ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-Entity: {EntityId}"); - Logger.LogDebug("{EntityInfo} is closed", DumpEntityConfiguration()); - return result; - } - finally + if (_client is { IsClosed: true }) { - _writeLock.Release(); + return result; } + + var closed = await _client.MaybeClose($"closing: {EntityId}", + GetStream(), config.Pool) + .ConfigureAwait(false); + ClientExceptions.MaybeThrowException(closed.ResponseCode, $"_client-close-Entity: {EntityId}"); + Logger.LogDebug("{EntityInfo} is closed", DumpEntityConfiguration()); + return result; } protected void Dispose(bool disposing) diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs index 6ffe526c..8440f443 100644 --- a/RabbitMQ.Stream.Client/Client.cs +++ b/RabbitMQ.Stream.Client/Client.cs @@ -207,6 +207,7 @@ public static async Task Create(ClientParameters parameters, ILogger log client.connection = await Connection .Create(parameters.Endpoint, client.HandleIncoming, client.HandleClosed, parameters.Ssl, logger) .ConfigureAwait(false); + client.connection.ClientId = client.ClientId; // exchange properties var peerPropertiesResponse = await client.Request(corr => new PeerPropertiesRequest(corr, parameters.Properties)).ConfigureAwait(false); @@ -306,7 +307,7 @@ public ValueTask Publish(T msg) where T : struct, ICommand Action<(ulong, ResponseCode)[]> errorCallback, ConnectionsPool pool = null) { await _poolSemaphore.WaitAsync().ConfigureAwait(false); - var publisherId = ConnectionsPool.FindNextValidId(publishers.Keys.ToList()); + var publisherId = ConnectionsPool.FindNextValidId(publishers.Keys.ToList(), IncrementEntityId()); DeclarePublisherResponse response; try @@ -320,7 +321,7 @@ public ValueTask Publish(T msg) where T : struct, ICommand _poolSemaphore.Release(); } - if (response.ResponseCode == ResponseCode.Ok || pool == null) + if (response.ResponseCode == ResponseCode.Ok) return (publisherId, response); // if the response code is not ok we need to remove the subscription @@ -357,59 +358,65 @@ await Request(corr => public async Task<(byte, SubscribeResponse)> Subscribe(string stream, IOffsetType offsetType, ushort initialCredit, Dictionary properties, Func deliverHandler, - Func> consumerUpdateHandler = null) + Func> consumerUpdateHandler = null, ConnectionsPool pool = null) { - return await Subscribe(new RawConsumerConfig(stream) { OffsetSpec = offsetType }, + return await Subscribe(new RawConsumerConfig(stream) { OffsetSpec = offsetType, Pool = pool }, initialCredit, properties, deliverHandler, consumerUpdateHandler).ConfigureAwait(false); } + private byte _nextEntityId = 0; + + // the entity id is a byte so we need to increment it and reset it when it reaches the max value + // to avoid to use always the same ids when producers and consumers are created + // so even there is a connection with one producer or consumer we need to increment the id + private byte IncrementEntityId() + { + lock (Obj) + { + var current = _nextEntityId; + _nextEntityId++; + if (_nextEntityId != byte.MaxValue) + return current; + _nextEntityId = 0; + return _nextEntityId; + } + } + public async Task<(byte, SubscribeResponse)> Subscribe(RawConsumerConfig config, ushort initialCredit, Dictionary properties, Func deliverHandler, Func> consumerUpdateHandler) { await _poolSemaphore.WaitAsync().ConfigureAwait(false); - var subscriptionId = ConnectionsPool.FindNextValidId(consumers.Keys.ToList()); - + var subscriptionId = ConnectionsPool.FindNextValidId(consumers.Keys.ToList(), IncrementEntityId()); + SubscribeResponse response; try { - SubscribeResponse response; - try - { - consumers.Add(subscriptionId, - new ConsumerEvents( - deliverHandler, - consumerUpdateHandler)); - - response = await Request(corr => - new SubscribeRequest(corr, subscriptionId, config.Stream, config.OffsetSpec, initialCredit, - properties)).ConfigureAwait(false); - } - finally - { - _poolSemaphore.Release(); - } - - if (response.ResponseCode == ResponseCode.Ok) - return (subscriptionId, response); + consumers.Add(subscriptionId, + new ConsumerEvents( + deliverHandler, + consumerUpdateHandler)); - ClientExceptions.MaybeThrowException(response.ResponseCode, - $"Error while creating consumer for stream {config.Stream}"); + response = await Request(corr => + new SubscribeRequest(corr, subscriptionId, config.Stream, config.OffsetSpec, initialCredit, + properties)).ConfigureAwait(false); } - catch (Exception e) + finally { - // if the response code is not ok we need to remove the subscription - // and close the connection if necessary. - consumers.Remove(subscriptionId); - await MaybeClose("Create Consumer Exception", config.Stream, config.Pool).ConfigureAwait(false); - throw new CreateConsumerException( - $"Error while creating consumer for stream {config.Stream}, error: {e.Message}"); + _poolSemaphore.Release(); } - return (subscriptionId, new SubscribeResponse(subscriptionId, ResponseCode.InternalError)); + if (response.ResponseCode == ResponseCode.Ok) + return (subscriptionId, response); + + // if the response code is not ok we need to remove the subscription + // and close the connection if necessary. + consumers.Remove(subscriptionId); + await MaybeClose("Create Consumer Exception", config.Stream, config.Pool).ConfigureAwait(false); + return (subscriptionId, response); } public async Task Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false) @@ -799,7 +806,6 @@ internal async Task MaybeClose(string reason, string stream, Conn _logger.LogInformation("Close connection for the {ClientId}", ClientId); // the client can be closed in an unexpected way so we need to remove it from the pool // so you will find pool.remove(ClientId) also to the disconnect event - // pool.remove(ClientId) is a duplicate call here but it is ok. The pool is idempotent pool.Remove(ClientId); await Close(reason).ConfigureAwait(false); } diff --git a/RabbitMQ.Stream.Client/Connection.cs b/RabbitMQ.Stream.Client/Connection.cs index 5ce034bb..8b65335c 100644 --- a/RabbitMQ.Stream.Client/Connection.cs +++ b/RabbitMQ.Stream.Client/Connection.cs @@ -33,7 +33,7 @@ public class Connection : IDisposable private CancellationToken Token => _cancelTokenSource.Token; internal int NumFrames => numFrames; - + internal string ClientId { get; set; } public bool IsClosed => isClosed; private static System.IO.Stream MaybeTcpUpgrade(NetworkStream networkStream, SslOption sslOption) @@ -191,6 +191,8 @@ private async Task ProcessIncomingFrames() finally { isClosed = true; + _logger?.LogDebug("TCP Connection Closed ClientId: {ClientId} is IsCancellationRequested {Token} ", + ClientId, Token.IsCancellationRequested); // Mark the PipeReader as complete await reader.CompleteAsync(caught).ConfigureAwait(false); var t = closedCallback?.Invoke("TCP Connection Closed")!; diff --git a/RabbitMQ.Stream.Client/ConnectionsPool.cs b/RabbitMQ.Stream.Client/ConnectionsPool.cs index 1c9daa0f..ad0cd244 100644 --- a/RabbitMQ.Stream.Client/ConnectionsPool.cs +++ b/RabbitMQ.Stream.Client/ConnectionsPool.cs @@ -100,30 +100,35 @@ public class ConnectionsPool { private static readonly object s_lock = new(); - internal static byte FindNextValidId(List ids) + internal static byte FindNextValidId(List ids, byte nextId = 0) { lock (s_lock) { - if (ids.Count == 0) - { - return 0; - } - // // we start with the recycle when we reach the max value // // in this way we can avoid to recycle the same ids in a short time ids.Sort(); - if (ids[^1] != byte.MaxValue) - return (byte)(ids[^1] + 1); + var l = ids.Where(b => b >= nextId).ToList(); + l.Sort(); + if (l.Count == 0) + { + // not necessary to start from 0 because the ids are recycled + // nextid is passed as parameter to avoid to start from 0 + // see client:IncrementEntityId/0 + return nextId; + } + + if (l[^1] != byte.MaxValue) + return (byte)(l[^1] + 1); for (var i = 0; i < ids.Count - 1; i++) { - if (ids[i + 1] - ids[i] > 1) + if (l[i + 1] - l[i] > 1) { - return (byte)(ids[i] + 1); + return (byte)(l[i] + 1); } } - return (byte)(ids[^1] + 1); + return (byte)(l[^1] + 1); } } diff --git a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt index 919b8adc..68d43a8c 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt @@ -186,7 +186,6 @@ RabbitMQ.Stream.Client.Client.QueryPublisherSequence(string publisherRef, string RabbitMQ.Stream.Client.Client.StoreOffset(string reference, string stream, ulong offsetValue) -> System.Threading.Tasks.ValueTask RabbitMQ.Stream.Client.Client.StreamExists(string stream) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Client.Subscribe(RabbitMQ.Stream.Client.RawConsumerConfig config, ushort initialCredit, System.Collections.Generic.Dictionary properties, System.Func deliverHandler, System.Func> consumerUpdateHandler) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)> -RabbitMQ.Stream.Client.Client.Subscribe(string stream, RabbitMQ.Stream.Client.IOffsetType offsetType, ushort initialCredit, System.Collections.Generic.Dictionary properties, System.Func deliverHandler, System.Func> consumerUpdateHandler = null) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)> RabbitMQ.Stream.Client.ClientParameters RabbitMQ.Stream.Client.ClientParameters.AddressResolver.get -> RabbitMQ.Stream.Client.AddressResolver RabbitMQ.Stream.Client.ClientParameters.AddressResolver.set -> void diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index 328502a6..72336582 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -33,6 +33,7 @@ RabbitMQ.Stream.Client.Client.DeletePublisher(byte publisherId, bool ignoreIfAlr RabbitMQ.Stream.Client.Client.ExchangeVersions() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask +RabbitMQ.Stream.Client.Client.Subscribe(string stream, RabbitMQ.Stream.Client.IOffsetType offsetType, ushort initialCredit, System.Collections.Generic.Dictionary properties, System.Func deliverHandler, System.Func> consumerUpdateHandler = null, RabbitMQ.Stream.Client.ConnectionsPool pool = null) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)> RabbitMQ.Stream.Client.Client.Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.set -> void diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index 82e6d775..8c629536 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -266,7 +266,20 @@ async Task DispatchMessage(Message message, ulong i) // it is useful only in single active consumer if (IsPromotedAsActive) { - var canDispatch = true; + if (_status != EntityStatus.Open) + { + Logger?.LogDebug( + "{EntityInfo} is not active. message won't dispatched", + DumpEntityConfiguration()); + } + + // can dispatch only if the consumer is active + // it usually at this point the consumer is active + // but in rare case where the consumer is closed and open in a short + // time the ids could be the same to not problem we need just to skip the message + // Given the way how the ids are generated it is very rare to have the same ids + // it is just a safety check + var canDispatch = _status == EntityStatus.Open; if (_config.IsFiltering) { @@ -280,7 +293,7 @@ async Task DispatchMessage(Message message, ulong i) } catch (Exception e) { - Logger.LogError(e, + Logger?.LogError(e, "Error while filtering message. Message with offset {MessageOffset} won't be dispatched." + "Suggestion: review the PostFilter value function" + "{EntityInfo}", @@ -394,7 +407,7 @@ private void ProcessChunks() { // need to wait the subscription is completed // else the _subscriberId could be incorrect - await _completeSubscription.Task.ConfigureAwait(false); + _completeSubscription.Task.Wait(); try { while (!Token.IsCancellationRequested && @@ -500,6 +513,7 @@ private async Task Init() var chunkConsumed = 0; // this the default value for the consumer. _config.StoredOffsetSpec = _config.OffsetSpec; + _status = EntityStatus.Initializing; (EntityId, var response) = await _client.Subscribe( _config, _initialCredits, @@ -580,8 +594,9 @@ private async Task Init() if (response.ResponseCode == ResponseCode.Ok) { - _completeSubscription.SetResult(); _status = EntityStatus.Open; + // the subscription is completed so the parsechunk can start to process the chunks + _completeSubscription.SetResult(); return; } @@ -653,12 +668,13 @@ public override async Task Close() // when the consumer is closed we must be sure that the // the subscription is completed to avoid problems with the connection // It could happen when the closing is called just after the creation - await _completeSubscription.Task.ConfigureAwait(false); + _completeSubscription.Task.Wait(); return await Shutdown(_config).ConfigureAwait(false); } public void Dispose() { + _completeSubscription.Task.Wait(); try { Dispose(true); diff --git a/Tests/ClientTests.cs b/Tests/ClientTests.cs index 967b724a..32949f50 100644 --- a/Tests/ClientTests.cs +++ b/Tests/ClientTests.cs @@ -78,11 +78,8 @@ public async void MetadataUpdateIsHandled() var stream = Guid.NewGuid().ToString(); var testPassed = new TaskCompletionSource(); var clientParameters = new ClientParameters(); - clientParameters.OnMetadataUpdate += (update) => - { - testPassed.SetResult(update); - }; - + clientParameters.OnMetadataUpdate += (update) => { testPassed.SetResult(update); }; + var client = await Client.Create(clientParameters); await client.CreateStream(stream, new Dictionary()); Action> confirmed = (pubIds) => { }; @@ -110,11 +107,24 @@ public async void DeclarePublisherShouldReturnErrorCode() var publisherRef = Guid.NewGuid().ToString(); var (publisherId, result) = - await client.DeclarePublisher(publisherRef, "this-stream-does-not-exist", confirmed, errored); + await client.DeclarePublisher(publisherRef, "this-stream-does-not-exist", confirmed, errored, + new ConnectionsPool(0, 1)); Assert.Equal(ResponseCode.StreamDoesNotExist, result.ResponseCode); await client.Close("done"); } + [Fact] + public async void DeclareConsumerShouldReturnErrorCode() + { + var clientParameters = new ClientParameters { }; + var client = await Client.Create(clientParameters); + var (subId, subscribeResponse) = await client.Subscribe( + "this-stream-does-not-exist", new OffsetTypeLast(), 1, + new Dictionary(), null, null, new ConnectionsPool(0, 1)); + Assert.Equal(ResponseCode.StreamDoesNotExist, subscribeResponse.ResponseCode); + await client.Close("done"); + } + [Fact] public async void PublishShouldError() { diff --git a/Tests/ConnectionsPoolTests.cs b/Tests/ConnectionsPoolTests.cs index 9a82a9bf..f3db1706 100644 --- a/Tests/ConnectionsPoolTests.cs +++ b/Tests/ConnectionsPoolTests.cs @@ -539,6 +539,36 @@ await Assert.ThrowsAsync(async () => await RawConsumer.Creat await client.Close("byte"); } + [Fact] + public async void ValidateTheRecycleIDs() + { + var client = await Client.Create(new ClientParameters() { }); + const string Stream1 = "pool_test_stream_1_validate_ids"; + await client.CreateStream(Stream1, new Dictionary()); + + var pool = new ConnectionsPool(0, 50); + var metaDataInfo = await client.QueryMetadata(new[] { Stream1 }); + var p = await RawProducer.Create(client.Parameters, new RawProducerConfig(Stream1) { Pool = pool }, + metaDataInfo.StreamInfos[Stream1]); + + for (var i = 0; i < 30; i++) + { + var p1 = await RawProducer.Create(client.Parameters, new RawProducerConfig(Stream1) { Pool = pool }, + metaDataInfo.StreamInfos[Stream1]); + Assert.Equal(1, pool.ConnectionsCount); + Assert.Equal(2, pool.ActiveIdsCountForStream(Stream1)); + Assert.Equal(2, pool.ActiveIdsCount); + Assert.NotEmpty(ProducersIdsPerConnection(p1).ToList()); + var l = ProducersIdsPerConnection(p1).ToList(); + Assert.Equal(i + 1, l[1]); + await p1.Close(); + } + + await p.Close(); + await client.DeleteStream(Stream1).ConfigureAwait(false); + await client.Close("byte"); + } + /// /// In this test we need to check the pool consistency when there is an error during the creation of the producer or consumer /// and close the pending connections in case the pool is full. @@ -941,6 +971,27 @@ public void FindNextValidIdShouldReturnZeroGivenEmptyList() Assert.Equal(0, missing); } + [Fact] + public void FindNextValidIdShouldReturnTheSameIdGivenEmptyList() + { + var ids = new List(); + var missing = ConnectionsPool.FindNextValidId(ids, 5); + Assert.Equal(5, missing); + } + + [Fact] + public void FindNextValidIdShouldReturnTheNextGivenAStartId() + { + var ids = new List(); + var id = ConnectionsPool.FindNextValidId(ids, 255); + ids.Add(id); + Assert.Equal(255, id); + var next = ConnectionsPool.FindNextValidId(ids); + ids.Add(next); + Assert.Equal(0, next); + Assert.Equal(1, ConnectionsPool.FindNextValidId(ids)); + } + [Fact] public void FindNextValidIdShouldReturnOne() { @@ -1010,5 +1061,53 @@ public void RecycleIdsWhenTheMaxIsReached() Assert.Equal(7, nextValidId); ids.Add(7); } + + // The ids needs to be always consecutive even there are missing ids + [Fact] + public void RecycleIdsWhenTheMaxIsReachedAndStartWithAnId() + { + var ids = new List() + { + 0, + 1, + 2, + // 3 is missing + 4, + // 5 is missing + 6, + // 7 is missing + 8, + 9 + }; + // even there are missing ids the next valid id is the next one + var nextValidId = ConnectionsPool.FindNextValidId(ids, 200); + Assert.Equal(200, nextValidId); + + // even we start from 3 the next valid id is 10, since we start from the end + nextValidId = ConnectionsPool.FindNextValidId(ids, 3); + ids.Add(nextValidId); + Assert.Equal(10, nextValidId); + + nextValidId = ConnectionsPool.FindNextValidId(ids, 255); + ids.Add(nextValidId); + Assert.Equal(255, nextValidId); + + nextValidId = ConnectionsPool.FindNextValidId(ids, 0); + ids.Add(nextValidId); + Assert.Equal(3, nextValidId); + + nextValidId = ConnectionsPool.FindNextValidId(ids, 3); + ids.Add(nextValidId); + Assert.Equal(5, nextValidId); + + nextValidId = ConnectionsPool.FindNextValidId(ids, 5); + ids.Add(nextValidId); + Assert.Equal(7, nextValidId); + + nextValidId = ConnectionsPool.FindNextValidId(ids, 0); + ids.Add(nextValidId); + Assert.Equal(11, nextValidId); + + } } }