diff --git a/RabbitMQ.Stream.Client/AbstractEntity.cs b/RabbitMQ.Stream.Client/AbstractEntity.cs
index e737c5f4..ea58bf0e 100644
--- a/RabbitMQ.Stream.Client/AbstractEntity.cs
+++ b/RabbitMQ.Stream.Client/AbstractEntity.cs
@@ -9,7 +9,6 @@
namespace RabbitMQ.Stream.Client
{
-
public abstract record EntityCommonConfig
{
internal ConnectionsPool Pool { get; set; }
@@ -19,7 +18,8 @@ internal enum EntityStatus
{
Open,
Closed,
- Disposed
+ Disposed,
+ Initializing
}
public interface IClosable
@@ -65,6 +65,8 @@ private void MaybeCancelToken()
///
protected abstract Task DeleteEntityFromTheServer(bool ignoreIfAlreadyDeleted = false);
+ // private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1);
+
///
/// Internal close method. It is called by the public Close method.
/// Set the status to closed and remove the producer or consumer from the server ( if it is not already removed )
@@ -75,13 +77,13 @@ private void MaybeCancelToken()
///
protected async Task Shutdown(EntityCommonConfig config, bool ignoreIfAlreadyDeleted = false)
{
- MaybeCancelToken();
-
if (!IsOpen()) // the client is already closed
{
return ResponseCode.Ok;
}
+ MaybeCancelToken();
+
_status = EntityStatus.Closed;
var result = await DeleteEntityFromTheServer(ignoreIfAlreadyDeleted).ConfigureAwait(false);
@@ -120,7 +122,8 @@ protected void Dispose(bool disposing)
}
catch (Exception e)
{
- Logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", DumpEntityConfiguration(), e.Message);
+ Logger?.LogWarning("Failed to close {EntityInfo}, error {Error} ", DumpEntityConfiguration(),
+ e.Message);
}
finally
{
diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs
index 6361370f..8440f443 100644
--- a/RabbitMQ.Stream.Client/Client.cs
+++ b/RabbitMQ.Stream.Client/Client.cs
@@ -207,7 +207,7 @@ public static async Task Create(ClientParameters parameters, ILogger log
client.connection = await Connection
.Create(parameters.Endpoint, client.HandleIncoming, client.HandleClosed, parameters.Ssl, logger)
.ConfigureAwait(false);
-
+ client.connection.ClientId = client.ClientId;
// exchange properties
var peerPropertiesResponse = await client.Request(corr =>
new PeerPropertiesRequest(corr, parameters.Properties)).ConfigureAwait(false);
@@ -307,7 +307,7 @@ public ValueTask Publish(T msg) where T : struct, ICommand
Action<(ulong, ResponseCode)[]> errorCallback, ConnectionsPool pool = null)
{
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
- var publisherId = ConnectionsPool.FindNextValidId(publishers.Keys.ToList());
+ var publisherId = ConnectionsPool.FindNextValidId(publishers.Keys.ToList(), IncrementEntityId());
DeclarePublisherResponse response;
try
@@ -321,7 +321,7 @@ public ValueTask Publish(T msg) where T : struct, ICommand
_poolSemaphore.Release();
}
- if (response.ResponseCode == ResponseCode.Ok || pool == null)
+ if (response.ResponseCode == ResponseCode.Ok)
return (publisherId, response);
// if the response code is not ok we need to remove the subscription
@@ -358,22 +358,40 @@ await Request(corr =>
public async Task<(byte, SubscribeResponse)> Subscribe(string stream, IOffsetType offsetType,
ushort initialCredit,
Dictionary properties, Func deliverHandler,
- Func> consumerUpdateHandler = null)
+ Func> consumerUpdateHandler = null, ConnectionsPool pool = null)
{
- return await Subscribe(new RawConsumerConfig(stream) { OffsetSpec = offsetType },
+ return await Subscribe(new RawConsumerConfig(stream) { OffsetSpec = offsetType, Pool = pool },
initialCredit,
properties,
deliverHandler,
consumerUpdateHandler).ConfigureAwait(false);
}
+ private byte _nextEntityId = 0;
+
+ // the entity id is a byte so we need to increment it and reset it when it reaches the max value
+ // to avoid to use always the same ids when producers and consumers are created
+ // so even there is a connection with one producer or consumer we need to increment the id
+ private byte IncrementEntityId()
+ {
+ lock (Obj)
+ {
+ var current = _nextEntityId;
+ _nextEntityId++;
+ if (_nextEntityId != byte.MaxValue)
+ return current;
+ _nextEntityId = 0;
+ return _nextEntityId;
+ }
+ }
+
public async Task<(byte, SubscribeResponse)> Subscribe(RawConsumerConfig config,
ushort initialCredit,
Dictionary properties, Func deliverHandler,
Func> consumerUpdateHandler)
{
await _poolSemaphore.WaitAsync().ConfigureAwait(false);
- var subscriptionId = ConnectionsPool.FindNextValidId(consumers.Keys.ToList());
+ var subscriptionId = ConnectionsPool.FindNextValidId(consumers.Keys.ToList(), IncrementEntityId());
SubscribeResponse response;
try
{
@@ -788,7 +806,6 @@ internal async Task MaybeClose(string reason, string stream, Conn
_logger.LogInformation("Close connection for the {ClientId}", ClientId);
// the client can be closed in an unexpected way so we need to remove it from the pool
// so you will find pool.remove(ClientId) also to the disconnect event
- // pool.remove(ClientId) is a duplicate call here but it is ok. The pool is idempotent
pool.Remove(ClientId);
await Close(reason).ConfigureAwait(false);
}
diff --git a/RabbitMQ.Stream.Client/Connection.cs b/RabbitMQ.Stream.Client/Connection.cs
index 5ce034bb..8b65335c 100644
--- a/RabbitMQ.Stream.Client/Connection.cs
+++ b/RabbitMQ.Stream.Client/Connection.cs
@@ -33,7 +33,7 @@ public class Connection : IDisposable
private CancellationToken Token => _cancelTokenSource.Token;
internal int NumFrames => numFrames;
-
+ internal string ClientId { get; set; }
public bool IsClosed => isClosed;
private static System.IO.Stream MaybeTcpUpgrade(NetworkStream networkStream, SslOption sslOption)
@@ -191,6 +191,8 @@ private async Task ProcessIncomingFrames()
finally
{
isClosed = true;
+ _logger?.LogDebug("TCP Connection Closed ClientId: {ClientId} is IsCancellationRequested {Token} ",
+ ClientId, Token.IsCancellationRequested);
// Mark the PipeReader as complete
await reader.CompleteAsync(caught).ConfigureAwait(false);
var t = closedCallback?.Invoke("TCP Connection Closed")!;
diff --git a/RabbitMQ.Stream.Client/ConnectionsPool.cs b/RabbitMQ.Stream.Client/ConnectionsPool.cs
index 1c9daa0f..ad0cd244 100644
--- a/RabbitMQ.Stream.Client/ConnectionsPool.cs
+++ b/RabbitMQ.Stream.Client/ConnectionsPool.cs
@@ -100,30 +100,35 @@ public class ConnectionsPool
{
private static readonly object s_lock = new();
- internal static byte FindNextValidId(List ids)
+ internal static byte FindNextValidId(List ids, byte nextId = 0)
{
lock (s_lock)
{
- if (ids.Count == 0)
- {
- return 0;
- }
-
// // we start with the recycle when we reach the max value
// // in this way we can avoid to recycle the same ids in a short time
ids.Sort();
- if (ids[^1] != byte.MaxValue)
- return (byte)(ids[^1] + 1);
+ var l = ids.Where(b => b >= nextId).ToList();
+ l.Sort();
+ if (l.Count == 0)
+ {
+ // not necessary to start from 0 because the ids are recycled
+ // nextid is passed as parameter to avoid to start from 0
+ // see client:IncrementEntityId/0
+ return nextId;
+ }
+
+ if (l[^1] != byte.MaxValue)
+ return (byte)(l[^1] + 1);
for (var i = 0; i < ids.Count - 1; i++)
{
- if (ids[i + 1] - ids[i] > 1)
+ if (l[i + 1] - l[i] > 1)
{
- return (byte)(ids[i] + 1);
+ return (byte)(l[i] + 1);
}
}
- return (byte)(ids[^1] + 1);
+ return (byte)(l[^1] + 1);
}
}
diff --git a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt
index 919b8adc..68d43a8c 100644
--- a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt
+++ b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt
@@ -186,7 +186,6 @@ RabbitMQ.Stream.Client.Client.QueryPublisherSequence(string publisherRef, string
RabbitMQ.Stream.Client.Client.StoreOffset(string reference, string stream, ulong offsetValue) -> System.Threading.Tasks.ValueTask
RabbitMQ.Stream.Client.Client.StreamExists(string stream) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.Client.Subscribe(RabbitMQ.Stream.Client.RawConsumerConfig config, ushort initialCredit, System.Collections.Generic.Dictionary properties, System.Func deliverHandler, System.Func> consumerUpdateHandler) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)>
-RabbitMQ.Stream.Client.Client.Subscribe(string stream, RabbitMQ.Stream.Client.IOffsetType offsetType, ushort initialCredit, System.Collections.Generic.Dictionary properties, System.Func deliverHandler, System.Func> consumerUpdateHandler = null) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)>
RabbitMQ.Stream.Client.ClientParameters
RabbitMQ.Stream.Client.ClientParameters.AddressResolver.get -> RabbitMQ.Stream.Client.AddressResolver
RabbitMQ.Stream.Client.ClientParameters.AddressResolver.set -> void
diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
index 328502a6..72336582 100644
--- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
+++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
@@ -33,6 +33,7 @@ RabbitMQ.Stream.Client.Client.DeletePublisher(byte publisherId, bool ignoreIfAlr
RabbitMQ.Stream.Client.Client.ExchangeVersions() -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.Client.QueryRoute(string superStream, string routingKey) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.Client.StreamStats(string stream) -> System.Threading.Tasks.ValueTask
+RabbitMQ.Stream.Client.Client.Subscribe(string stream, RabbitMQ.Stream.Client.IOffsetType offsetType, ushort initialCredit, System.Collections.Generic.Dictionary properties, System.Func deliverHandler, System.Func> consumerUpdateHandler = null, RabbitMQ.Stream.Client.ConnectionsPool pool = null) -> System.Threading.Tasks.Task<(byte, RabbitMQ.Stream.Client.SubscribeResponse)>
RabbitMQ.Stream.Client.Client.Unsubscribe(byte subscriptionId, bool ignoreIfAlreadyRemoved = false) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism
RabbitMQ.Stream.Client.ClientParameters.AuthMechanism.set -> void
diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs
index 2517234a..8c629536 100644
--- a/RabbitMQ.Stream.Client/RawConsumer.cs
+++ b/RabbitMQ.Stream.Client/RawConsumer.cs
@@ -177,6 +177,7 @@ protected override string GetStream()
{
return _config.Stream;
}
+
public async Task StoreOffset(ulong offset)
{
await _client.StoreOffset(_config.Reference, _config.Stream, offset).ConfigureAwait(false);
@@ -265,7 +266,20 @@ async Task DispatchMessage(Message message, ulong i)
// it is useful only in single active consumer
if (IsPromotedAsActive)
{
- var canDispatch = true;
+ if (_status != EntityStatus.Open)
+ {
+ Logger?.LogDebug(
+ "{EntityInfo} is not active. message won't dispatched",
+ DumpEntityConfiguration());
+ }
+
+ // can dispatch only if the consumer is active
+ // it usually at this point the consumer is active
+ // but in rare case where the consumer is closed and open in a short
+ // time the ids could be the same to not problem we need just to skip the message
+ // Given the way how the ids are generated it is very rare to have the same ids
+ // it is just a safety check
+ var canDispatch = _status == EntityStatus.Open;
if (_config.IsFiltering)
{
@@ -279,7 +293,7 @@ async Task DispatchMessage(Message message, ulong i)
}
catch (Exception e)
{
- Logger.LogError(e,
+ Logger?.LogError(e,
"Error while filtering message. Message with offset {MessageOffset} won't be dispatched."
+ "Suggestion: review the PostFilter value function"
+ "{EntityInfo}",
@@ -393,7 +407,7 @@ private void ProcessChunks()
{
// need to wait the subscription is completed
// else the _subscriberId could be incorrect
- await _completeSubscription.Task.ConfigureAwait(false);
+ _completeSubscription.Task.Wait();
try
{
while (!Token.IsCancellationRequested &&
@@ -405,6 +419,8 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) //
// we request the credit before process the check to keep the network busy
try
{
+ if (Token.IsCancellationRequested)
+ break;
await _client.Credit(EntityId, 1).ConfigureAwait(false);
}
catch (InvalidOperationException)
@@ -497,47 +513,62 @@ private async Task Init()
var chunkConsumed = 0;
// this the default value for the consumer.
_config.StoredOffsetSpec = _config.OffsetSpec;
+ _status = EntityStatus.Initializing;
(EntityId, var response) = await _client.Subscribe(
_config,
_initialCredits,
consumerProperties,
async deliver =>
{
- chunkConsumed++;
- // Send the chunk to the _chunksBuffer
- // in this way the chunks are processed in a separate thread
- // this wont' block the socket thread
- // introduced https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250
- if (Token.IsCancellationRequested)
+ try
{
- // the consumer is closing from the user but some chunks are still in the buffer
- // simply skip the chunk
- Logger?.LogTrace(
- "CancellationToken requested. The {EntityInfo} " +
- "The chunk won't be processed",
- DumpEntityConfiguration());
- return;
- }
+ chunkConsumed++;
+ // Send the chunk to the _chunksBuffer
+ // in this way the chunks are processed in a separate thread
+ // this wont' block the socket thread
+ // introduced https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/pull/250
+ if (Token.IsCancellationRequested)
+ {
+ // the consumer is closing from the user but some chunks are still in the buffer
+ // simply skip the chunk
+ Logger?.LogTrace(
+ "CancellationToken requested. The {EntityInfo} " +
+ "The chunk won't be processed",
+ DumpEntityConfiguration());
+ return;
+ }
- if (_config.Crc32 is not null)
- {
- var crcCalculated = BitConverter.ToUInt32(
- _config.Crc32.Hash(deliver.Chunk.Data.ToArray())
- );
- if (crcCalculated != deliver.Chunk.Crc)
+ if (_config.Crc32 is not null)
{
- Logger?.LogError(
- "CRC32 does not match, server crc: {Crc}, local crc: {CrcCalculated}, {EntityInfo}, " +
- "Chunk Consumed {ChunkConsumed}", deliver.Chunk.Crc, crcCalculated, DumpEntityConfiguration(),
- chunkConsumed);
-
- throw new CrcException(
- $"CRC32 does not match, server crc: {deliver.Chunk.Crc}, local crc: {crcCalculated}, {DumpEntityConfiguration()}, " +
- $"Chunk Consumed {chunkConsumed}");
+ var crcCalculated = BitConverter.ToUInt32(
+ _config.Crc32.Hash(deliver.Chunk.Data.ToArray())
+ );
+ if (crcCalculated != deliver.Chunk.Crc)
+ {
+ Logger?.LogError(
+ "CRC32 does not match, server crc: {Crc}, local crc: {CrcCalculated}, {EntityInfo}, " +
+ "Chunk Consumed {ChunkConsumed}", deliver.Chunk.Crc, crcCalculated,
+ DumpEntityConfiguration(),
+ chunkConsumed);
+
+ throw new CrcException(
+ $"CRC32 does not match, server crc: {deliver.Chunk.Crc}, local crc: {crcCalculated}, {DumpEntityConfiguration()}, " +
+ $"Chunk Consumed {chunkConsumed}");
+ }
}
- }
- await _chunksBuffer.Writer.WriteAsync(deliver.Chunk, Token).ConfigureAwait(false);
+ await _chunksBuffer.Writer.WriteAsync(deliver.Chunk, Token).ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ // The consumer is closing from the user but some chunks are still in the buffer
+ // simply skip the chunk since the Token.IsCancellationRequested is true
+ // the catch is needed to avoid to propagate the exception to the socket thread.
+ Logger?.LogWarning(
+ "OperationCanceledException. {EntityInfo} has been closed while consuming messages. " +
+ "Token.IsCancellationRequested: {IsCancellationRequested}",
+ DumpEntityConfiguration(), Token.IsCancellationRequested);
+ }
}, async promotedAsActive =>
{
if (_config.ConsumerUpdateListener != null)
@@ -560,10 +591,12 @@ private async Task Init()
return _config.StoredOffsetSpec;
}
).ConfigureAwait(false);
+
if (response.ResponseCode == ResponseCode.Ok)
{
- _completeSubscription.SetResult();
_status = EntityStatus.Open;
+ // the subscription is completed so the parsechunk can start to process the chunks
+ _completeSubscription.SetResult();
return;
}
@@ -628,16 +661,20 @@ protected override async Task DeleteEntityFromTheServer(bool ignor
}
return ResponseCode.Ok;
-
}
public override async Task Close()
{
+ // when the consumer is closed we must be sure that the
+ // the subscription is completed to avoid problems with the connection
+ // It could happen when the closing is called just after the creation
+ _completeSubscription.Task.Wait();
return await Shutdown(_config).ConfigureAwait(false);
}
public void Dispose()
{
+ _completeSubscription.Task.Wait();
try
{
Dispose(true);
diff --git a/RabbitMQ.Stream.Client/Reliable/Consumer.cs b/RabbitMQ.Stream.Client/Reliable/Consumer.cs
index 01dbbfb6..527db9b3 100644
--- a/RabbitMQ.Stream.Client/Reliable/Consumer.cs
+++ b/RabbitMQ.Stream.Client/Reliable/Consumer.cs
@@ -211,7 +211,8 @@ public override async Task Close()
public override string ToString()
{
- return $"Consumer reference: {_consumerConfig.Reference}, stream: {_consumerConfig.Stream} ";
+ return $"Consumer reference: {_consumerConfig.Reference}, stream: {_consumerConfig.Stream}, " +
+ $"client name: {_consumerConfig.ClientProvidedName} ";
}
public ConsumerInfo Info { get; }
diff --git a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs
index c71791c2..abb6fde7 100644
--- a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs
+++ b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs
@@ -3,7 +3,6 @@
// Copyright (c) 2007-2023 VMware, Inc.
using System;
-
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
@@ -37,7 +36,7 @@ protected ReliableConfig(StreamSystem streamSystem, string stream)
public abstract class ReliableBase
{
protected readonly SemaphoreSlim SemaphoreSlim = new(1);
-
+ private readonly object _lock = new();
protected bool _isOpen;
protected bool _inReconnection;
@@ -61,7 +60,11 @@ private async Task Init(bool boot, IReconnectStrategy reconnectStrategy)
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
- _isOpen = true;
+ lock (_lock)
+ {
+ _isOpen = true;
+ }
+
await CreateNewEntity(boot).ConfigureAwait(false);
}
@@ -73,7 +76,11 @@ private async Task Init(bool boot, IReconnectStrategy reconnectStrategy)
{
// We consider the client as closed
// since the exception is raised to the caller
- _isOpen = false;
+ lock (_lock)
+ {
+ _isOpen = false;
+ }
+
throw;
}
}
@@ -107,7 +114,7 @@ protected async Task TryToReconnect(IReconnectStrategy reconnectStrategy)
_inReconnection = true;
try
{
- switch (await reconnectStrategy.WhenDisconnected(ToString()).ConfigureAwait(false) && _isOpen)
+ switch (await reconnectStrategy.WhenDisconnected(ToString()).ConfigureAwait(false) && IsOpen())
{
case true:
BaseLogger.LogInformation("{Identity} is disconnected. Client will try reconnect", ToString());
@@ -195,6 +202,9 @@ private void LogException(Exception exception)
public bool IsOpen()
{
- return _isOpen;
+ lock (_lock)
+ {
+ return _isOpen;
+ }
}
}
diff --git a/RabbitMQ.Stream.Client/Subscribe.cs b/RabbitMQ.Stream.Client/Subscribe.cs
index d195f48a..dffd2063 100644
--- a/RabbitMQ.Stream.Client/Subscribe.cs
+++ b/RabbitMQ.Stream.Client/Subscribe.cs
@@ -134,7 +134,7 @@ public int Write(Span span)
private readonly uint correlationId;
private readonly ResponseCode responseCode;
- private SubscribeResponse(uint correlationId, ResponseCode responseCode)
+ internal SubscribeResponse(uint correlationId, ResponseCode responseCode)
{
this.correlationId = correlationId;
this.responseCode = responseCode;
diff --git a/Tests/ClientTests.cs b/Tests/ClientTests.cs
index 967b724a..32949f50 100644
--- a/Tests/ClientTests.cs
+++ b/Tests/ClientTests.cs
@@ -78,11 +78,8 @@ public async void MetadataUpdateIsHandled()
var stream = Guid.NewGuid().ToString();
var testPassed = new TaskCompletionSource();
var clientParameters = new ClientParameters();
- clientParameters.OnMetadataUpdate += (update) =>
- {
- testPassed.SetResult(update);
- };
-
+ clientParameters.OnMetadataUpdate += (update) => { testPassed.SetResult(update); };
+
var client = await Client.Create(clientParameters);
await client.CreateStream(stream, new Dictionary());
Action> confirmed = (pubIds) => { };
@@ -110,11 +107,24 @@ public async void DeclarePublisherShouldReturnErrorCode()
var publisherRef = Guid.NewGuid().ToString();
var (publisherId, result) =
- await client.DeclarePublisher(publisherRef, "this-stream-does-not-exist", confirmed, errored);
+ await client.DeclarePublisher(publisherRef, "this-stream-does-not-exist", confirmed, errored,
+ new ConnectionsPool(0, 1));
Assert.Equal(ResponseCode.StreamDoesNotExist, result.ResponseCode);
await client.Close("done");
}
+ [Fact]
+ public async void DeclareConsumerShouldReturnErrorCode()
+ {
+ var clientParameters = new ClientParameters { };
+ var client = await Client.Create(clientParameters);
+ var (subId, subscribeResponse) = await client.Subscribe(
+ "this-stream-does-not-exist", new OffsetTypeLast(), 1,
+ new Dictionary(), null, null, new ConnectionsPool(0, 1));
+ Assert.Equal(ResponseCode.StreamDoesNotExist, subscribeResponse.ResponseCode);
+ await client.Close("done");
+ }
+
[Fact]
public async void PublishShouldError()
{
diff --git a/Tests/ConnectionsPoolTests.cs b/Tests/ConnectionsPoolTests.cs
index 9a82a9bf..f3db1706 100644
--- a/Tests/ConnectionsPoolTests.cs
+++ b/Tests/ConnectionsPoolTests.cs
@@ -539,6 +539,36 @@ await Assert.ThrowsAsync(async () => await RawConsumer.Creat
await client.Close("byte");
}
+ [Fact]
+ public async void ValidateTheRecycleIDs()
+ {
+ var client = await Client.Create(new ClientParameters() { });
+ const string Stream1 = "pool_test_stream_1_validate_ids";
+ await client.CreateStream(Stream1, new Dictionary());
+
+ var pool = new ConnectionsPool(0, 50);
+ var metaDataInfo = await client.QueryMetadata(new[] { Stream1 });
+ var p = await RawProducer.Create(client.Parameters, new RawProducerConfig(Stream1) { Pool = pool },
+ metaDataInfo.StreamInfos[Stream1]);
+
+ for (var i = 0; i < 30; i++)
+ {
+ var p1 = await RawProducer.Create(client.Parameters, new RawProducerConfig(Stream1) { Pool = pool },
+ metaDataInfo.StreamInfos[Stream1]);
+ Assert.Equal(1, pool.ConnectionsCount);
+ Assert.Equal(2, pool.ActiveIdsCountForStream(Stream1));
+ Assert.Equal(2, pool.ActiveIdsCount);
+ Assert.NotEmpty(ProducersIdsPerConnection(p1).ToList());
+ var l = ProducersIdsPerConnection(p1).ToList();
+ Assert.Equal(i + 1, l[1]);
+ await p1.Close();
+ }
+
+ await p.Close();
+ await client.DeleteStream(Stream1).ConfigureAwait(false);
+ await client.Close("byte");
+ }
+
///
/// In this test we need to check the pool consistency when there is an error during the creation of the producer or consumer
/// and close the pending connections in case the pool is full.
@@ -941,6 +971,27 @@ public void FindNextValidIdShouldReturnZeroGivenEmptyList()
Assert.Equal(0, missing);
}
+ [Fact]
+ public void FindNextValidIdShouldReturnTheSameIdGivenEmptyList()
+ {
+ var ids = new List();
+ var missing = ConnectionsPool.FindNextValidId(ids, 5);
+ Assert.Equal(5, missing);
+ }
+
+ [Fact]
+ public void FindNextValidIdShouldReturnTheNextGivenAStartId()
+ {
+ var ids = new List();
+ var id = ConnectionsPool.FindNextValidId(ids, 255);
+ ids.Add(id);
+ Assert.Equal(255, id);
+ var next = ConnectionsPool.FindNextValidId(ids);
+ ids.Add(next);
+ Assert.Equal(0, next);
+ Assert.Equal(1, ConnectionsPool.FindNextValidId(ids));
+ }
+
[Fact]
public void FindNextValidIdShouldReturnOne()
{
@@ -1010,5 +1061,53 @@ public void RecycleIdsWhenTheMaxIsReached()
Assert.Equal(7, nextValidId);
ids.Add(7);
}
+
+ // The ids needs to be always consecutive even there are missing ids
+ [Fact]
+ public void RecycleIdsWhenTheMaxIsReachedAndStartWithAnId()
+ {
+ var ids = new List()
+ {
+ 0,
+ 1,
+ 2,
+ // 3 is missing
+ 4,
+ // 5 is missing
+ 6,
+ // 7 is missing
+ 8,
+ 9
+ };
+ // even there are missing ids the next valid id is the next one
+ var nextValidId = ConnectionsPool.FindNextValidId(ids, 200);
+ Assert.Equal(200, nextValidId);
+
+ // even we start from 3 the next valid id is 10, since we start from the end
+ nextValidId = ConnectionsPool.FindNextValidId(ids, 3);
+ ids.Add(nextValidId);
+ Assert.Equal(10, nextValidId);
+
+ nextValidId = ConnectionsPool.FindNextValidId(ids, 255);
+ ids.Add(nextValidId);
+ Assert.Equal(255, nextValidId);
+
+ nextValidId = ConnectionsPool.FindNextValidId(ids, 0);
+ ids.Add(nextValidId);
+ Assert.Equal(3, nextValidId);
+
+ nextValidId = ConnectionsPool.FindNextValidId(ids, 3);
+ ids.Add(nextValidId);
+ Assert.Equal(5, nextValidId);
+
+ nextValidId = ConnectionsPool.FindNextValidId(ids, 5);
+ ids.Add(nextValidId);
+ Assert.Equal(7, nextValidId);
+
+ nextValidId = ConnectionsPool.FindNextValidId(ids, 0);
+ ids.Add(nextValidId);
+ Assert.Equal(11, nextValidId);
+
+ }
}
}