diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs index 3aca7691..f009a1db 100644 --- a/RabbitMQ.Stream.Client/Client.cs +++ b/RabbitMQ.Stream.Client/Client.cs @@ -843,6 +843,13 @@ public async Task StreamExists(string stream) { var streams = new[] { stream }; var response = await QueryMetadata(streams).ConfigureAwait(false); + if (response.StreamInfos is { Count: >= 1 } && + response.StreamInfos[stream].ResponseCode == ResponseCode.StreamNotAvailable) + { + + ClientExceptions.MaybeThrowException(ResponseCode.StreamNotAvailable, stream); + } + return response.StreamInfos is { Count: >= 1 } && response.StreamInfos[stream].ResponseCode == ResponseCode.Ok; } diff --git a/RabbitMQ.Stream.Client/ClientExceptions.cs b/RabbitMQ.Stream.Client/ClientExceptions.cs index 2e117407..587b8269 100644 --- a/RabbitMQ.Stream.Client/ClientExceptions.cs +++ b/RabbitMQ.Stream.Client/ClientExceptions.cs @@ -30,12 +30,15 @@ internal static bool IsAKnownException(Exception exception) if (exception is AggregateException aggregateException) { var x = aggregateException.InnerExceptions.Select(x => - x.GetType() == typeof(SocketException) || x.GetType() == typeof(TimeoutException) || - x.GetType() == typeof(LeaderNotFoundException) || x.GetType() == typeof(InvalidOperationException)); + x.GetType() == typeof(SocketException) || + x.GetType() == typeof(TimeoutException) || + x.GetType() == typeof(LeaderNotFoundException) || + x.GetType() == typeof(OperationCanceledException) || + x.GetType() == typeof(InvalidOperationException)); return x.Any(); } - return exception is (SocketException or TimeoutException or LeaderNotFoundException or InvalidOperationException) || + return exception is (SocketException or TimeoutException or LeaderNotFoundException or InvalidOperationException or OperationCanceledException) || IsStreamNotAvailable(exception); } diff --git a/RabbitMQ.Stream.Client/ConnectionsPool.cs b/RabbitMQ.Stream.Client/ConnectionsPool.cs index 81d8c633..2a445f65 100644 --- a/RabbitMQ.Stream.Client/ConnectionsPool.cs +++ b/RabbitMQ.Stream.Client/ConnectionsPool.cs @@ -154,6 +154,17 @@ internal async Task GetOrCreateClient(string brokerInfo, Func x.BrokerInfo == brokerInfo && x.Available); connectionItem.LastUsed = DateTime.UtcNow; + + if (connectionItem.Client is not { IsClosed: true }) + return connectionItem.Client; + + // the connection is closed + // let's remove it from the pool + Connections.TryRemove(connectionItem.Client.ClientId, out _); + // let's create a new one + connectionItem = new ConnectionItem(brokerInfo, _idsPerConnection, await createClient().ConfigureAwait(false)); + Connections.TryAdd(connectionItem.Client.ClientId, connectionItem); + return connectionItem.Client; } @@ -174,7 +185,6 @@ internal async Task GetOrCreateClient(string brokerInfo, Func>, Action<(ulong, ResponseCode)[]>))> Publishers { get; } IDictionary Consumers { get; } + + public bool IsClosed { get; } } } diff --git a/RabbitMQ.Stream.Client/IConsumer.cs b/RabbitMQ.Stream.Client/IConsumer.cs index d8e8dee0..51e72634 100644 --- a/RabbitMQ.Stream.Client/IConsumer.cs +++ b/RabbitMQ.Stream.Client/IConsumer.cs @@ -7,6 +7,11 @@ namespace RabbitMQ.Stream.Client; +public interface ISuperStreamConsumer : IConsumer +{ + public Task ReconnectPartition(StreamInfo streamInfo); +} + public interface IConsumer : IClosable { public Task StoreOffset(ulong offset); @@ -42,8 +47,6 @@ public record IConsumerConfig : EntityCommonConfig, INamedEntity public string Reference { get; set; } - public Func ConnectionClosedHandler { get; set; } - public ConsumerFilter ConsumerFilter { get; set; } = null; // InitialCredits is the initial credits to be used for the consumer. diff --git a/RabbitMQ.Stream.Client/IProducer.cs b/RabbitMQ.Stream.Client/IProducer.cs index 01bde43b..264ab370 100644 --- a/RabbitMQ.Stream.Client/IProducer.cs +++ b/RabbitMQ.Stream.Client/IProducer.cs @@ -8,6 +8,10 @@ namespace RabbitMQ.Stream.Client; +public interface ISuperStreamProducer : IProducer +{ + public Task ReconnectPartition(StreamInfo streamInfo); +} // // Producer interface for sending messages to a stream. // There are different types of producers: @@ -83,7 +87,6 @@ public record ProducerFilter public record IProducerConfig : EntityCommonConfig, INamedEntity { - public string Reference { get; set; } public int MaxInFlight { get; set; } = 1_000; public string ClientProvidedName { get; set; } = "dotnet-stream-raw-producer"; diff --git a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt index a76097d5..0c52b59d 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt @@ -335,8 +335,6 @@ RabbitMQ.Stream.Client.IConsumer.StoreOffset(ulong offset) -> System.Threading.T RabbitMQ.Stream.Client.IConsumerConfig RabbitMQ.Stream.Client.IConsumerConfig.ClientProvidedName.get -> string RabbitMQ.Stream.Client.IConsumerConfig.ClientProvidedName.set -> void -RabbitMQ.Stream.Client.IConsumerConfig.ConnectionClosedHandler.get -> System.Func -RabbitMQ.Stream.Client.IConsumerConfig.ConnectionClosedHandler.set -> void RabbitMQ.Stream.Client.IConsumerConfig.ConsumerUpdateListener.get -> System.Func> RabbitMQ.Stream.Client.IConsumerConfig.ConsumerUpdateListener.set -> void RabbitMQ.Stream.Client.IConsumerConfig.IsSingleActiveConsumer.get -> bool @@ -636,7 +634,6 @@ RabbitMQ.Stream.Client.Reliable.ProducerConfig.SuperStreamConfig.set -> void RabbitMQ.Stream.Client.Reliable.ProducerConfig.TimeoutMessageAfter.get -> System.TimeSpan RabbitMQ.Stream.Client.Reliable.ProducerConfig.TimeoutMessageAfter.init -> void RabbitMQ.Stream.Client.Reliable.ProducerFactory -RabbitMQ.Stream.Client.Reliable.ProducerFactory.CreateProducer() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Reliable.ProducerFactory.ProducerFactory() -> void RabbitMQ.Stream.Client.Reliable.ProducerFactory._confirmationPipe -> RabbitMQ.Stream.Client.Reliable.ConfirmationPipe RabbitMQ.Stream.Client.Reliable.ProducerFactory._producerConfig -> RabbitMQ.Stream.Client.Reliable.ProducerConfig @@ -723,9 +720,7 @@ RabbitMQ.Stream.Client.StreamSystem RabbitMQ.Stream.Client.StreamSystem.Close() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.CreateRawConsumer(RabbitMQ.Stream.Client.RawConsumerConfig rawConsumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.CreateRawProducer(RabbitMQ.Stream.Client.RawProducerConfig rawProducerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task -RabbitMQ.Stream.Client.StreamSystem.CreateRawSuperStreamProducer(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.CreateStream(RabbitMQ.Stream.Client.StreamSpec spec) -> System.Threading.Tasks.Task -RabbitMQ.Stream.Client.StreamSystem.CreateSuperStreamConsumer(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.DeleteStream(string stream) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.IsClosed.get -> bool RabbitMQ.Stream.Client.StreamSystem.QueryOffset(string reference, string stream) -> System.Threading.Tasks.Task @@ -776,8 +771,6 @@ static RabbitMQ.Stream.Client.LeaderLocator.ClientLocal.get -> RabbitMQ.Stream.C static RabbitMQ.Stream.Client.LeaderLocator.LeastLeaders.get -> RabbitMQ.Stream.Client.LeaderLocator static RabbitMQ.Stream.Client.LeaderLocator.Random.get -> RabbitMQ.Stream.Client.LeaderLocator static RabbitMQ.Stream.Client.Message.From(ref System.Buffers.SequenceReader reader, uint len) -> RabbitMQ.Stream.Client.Message -static RabbitMQ.Stream.Client.RawSuperStreamConsumer.Create(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, System.Collections.Generic.IDictionary streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.IConsumer -static RabbitMQ.Stream.Client.RawSuperStreamProducer.Create(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, System.Collections.Generic.IDictionary streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.IProducer static RabbitMQ.Stream.Client.Reliable.Consumer.Create(RabbitMQ.Stream.Client.Reliable.ConsumerConfig consumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task static RabbitMQ.Stream.Client.Reliable.Producer.Create(RabbitMQ.Stream.Client.Reliable.ProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task static RabbitMQ.Stream.Client.StreamCompressionCodecs.GetCompressionCodec(RabbitMQ.Stream.Client.CompressionType compressionType) -> RabbitMQ.Stream.Client.ICompressionCodec diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index 73e640d4..9fa27676 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -2,6 +2,7 @@ abstract RabbitMQ.Stream.Client.AbstractEntity.Close() -> System.Threading.Tasks abstract RabbitMQ.Stream.Client.AbstractEntity.DeleteEntityFromTheServer(bool ignoreIfAlreadyDeleted = false) -> System.Threading.Tasks.Task abstract RabbitMQ.Stream.Client.AbstractEntity.DumpEntityConfiguration() -> string abstract RabbitMQ.Stream.Client.AbstractEntity.GetStream() -> string +abstract RabbitMQ.Stream.Client.Reliable.ReliableBase.CreateNewEntity(bool boot) -> System.Threading.Tasks.Task const RabbitMQ.Stream.Client.RouteQueryResponse.Key = 24 -> ushort const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort override RabbitMQ.Stream.Client.Broker.ToString() -> string @@ -97,6 +98,7 @@ RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Me RabbitMQ.Stream.Client.IClient.ClientId.get -> string RabbitMQ.Stream.Client.IClient.ClientId.init -> void RabbitMQ.Stream.Client.IClient.Consumers.get -> System.Collections.Generic.IDictionary +RabbitMQ.Stream.Client.IClient.IsClosed.get -> bool RabbitMQ.Stream.Client.IClient.Publishers.get -> System.Collections.Generic.IDictionary>, System.Action<(ulong, RabbitMQ.Stream.Client.ResponseCode)[]>))> RabbitMQ.Stream.Client.IClosable RabbitMQ.Stream.Client.IClosable.Close() -> System.Threading.Tasks.Task @@ -139,6 +141,10 @@ RabbitMQ.Stream.Client.IProducerConfig.Filter.set -> void RabbitMQ.Stream.Client.IRouting RabbitMQ.Stream.Client.IRouting.CreateClient(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.Broker metaInfoBroker, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List partitions) -> System.Threading.Tasks.Task> +RabbitMQ.Stream.Client.ISuperStreamConsumer +RabbitMQ.Stream.Client.ISuperStreamConsumer.ReconnectPartition(RabbitMQ.Stream.Client.StreamInfo streamInfo) -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.ISuperStreamProducer +RabbitMQ.Stream.Client.ISuperStreamProducer.ReconnectPartition(RabbitMQ.Stream.Client.StreamInfo streamInfo) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.KeyRoutingStrategy RabbitMQ.Stream.Client.KeyRoutingStrategy.KeyRoutingStrategy(System.Func routingKeyExtractor, System.Func> routingKeyQFunc, string superStream) -> void RabbitMQ.Stream.Client.KeyRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List partitions) -> System.Threading.Tasks.Task> @@ -159,11 +165,19 @@ RabbitMQ.Stream.Client.PublishFilter.PublishFilter(byte publisherId, System.Coll RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int RabbitMQ.Stream.Client.PublishFilter.Write(System.Span span) -> int RabbitMQ.Stream.Client.RawConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo +RabbitMQ.Stream.Client.RawConsumerConfig.ConnectionClosedHandler.get -> System.Func +RabbitMQ.Stream.Client.RawConsumerConfig.ConnectionClosedHandler.set -> void RabbitMQ.Stream.Client.RawProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo RabbitMQ.Stream.Client.RawSuperStreamConsumer.Close() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.RawSuperStreamConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo +RabbitMQ.Stream.Client.RawSuperStreamConsumer.ReconnectPartition(RabbitMQ.Stream.Client.StreamInfo streamInfo) -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig.ConnectionClosedHandler.get -> System.Func +RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig.ConnectionClosedHandler.set -> void RabbitMQ.Stream.Client.RawSuperStreamProducer.Close() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.RawSuperStreamProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo +RabbitMQ.Stream.Client.RawSuperStreamProducer.ReconnectPartition(RabbitMQ.Stream.Client.StreamInfo streamInfo) -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.ConnectionClosedHandler.get -> System.Func +RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.ConnectionClosedHandler.set -> void RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void RabbitMQ.Stream.Client.Reconnect.IReconnectStrategy @@ -189,10 +203,9 @@ RabbitMQ.Stream.Client.Reliable.Producer.Info.get -> RabbitMQ.Stream.Client.Prod RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void -RabbitMQ.Stream.Client.Reliable.ReliableBase.CheckIfStreamIsAvailable(string stream, RabbitMQ.Stream.Client.StreamSystem system) -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.Reliable.ProducerFactory.CreateProducer(bool boot) -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.Reliable.ProducerFactory._producer -> RabbitMQ.Stream.Client.IProducer RabbitMQ.Stream.Client.Reliable.ReliableBase.CompareStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus toTest) -> bool -RabbitMQ.Stream.Client.Reliable.ReliableBase.IsValidStatus() -> bool -RabbitMQ.Stream.Client.Reliable.ReliableBase.MaybeReconnect() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Reliable.ReliableBase.UpdateStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus status) -> void RabbitMQ.Stream.Client.Reliable.ReliableBase._status -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus RabbitMQ.Stream.Client.Reliable.ReliableConfig.ReconnectStrategy.get -> RabbitMQ.Stream.Client.Reconnect.IReconnectStrategy @@ -230,6 +243,9 @@ RabbitMQ.Stream.Client.StreamStatsResponse.Statistic.get -> System.Collections.G RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse() -> void RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode, System.Collections.Generic.IDictionary statistic) -> void RabbitMQ.Stream.Client.StreamStatsResponse.Write(System.Span span) -> int +RabbitMQ.Stream.Client.StreamSystem.CreateRawSuperStreamProducer(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.StreamSystem.CreateSuperStreamConsumer(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.StreamSystem.StreamInfo(string streamName) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.set -> void @@ -245,6 +261,8 @@ static RabbitMQ.Stream.Client.Connection.Create(System.Net.EndPoint endpoint, Sy static RabbitMQ.Stream.Client.Message.From(ref System.Buffers.ReadOnlySequence seq, uint len) -> RabbitMQ.Stream.Client.Message static RabbitMQ.Stream.Client.RawConsumer.Create(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.RawConsumerConfig config, RabbitMQ.Stream.Client.StreamInfo metaStreamInfo, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task static RabbitMQ.Stream.Client.RawProducer.Create(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.RawProducerConfig config, RabbitMQ.Stream.Client.StreamInfo metaStreamInfo, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task +static RabbitMQ.Stream.Client.RawSuperStreamConsumer.Create(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, System.Collections.Generic.IDictionary streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.ISuperStreamConsumer +static RabbitMQ.Stream.Client.RawSuperStreamProducer.Create(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, System.Collections.Generic.IDictionary streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.ISuperStreamProducer static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task static RabbitMQ.Stream.Client.RoutingHelper.LookupLeaderConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task static RabbitMQ.Stream.Client.RoutingHelper.LookupRandomConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index b963b372..bb8fc1d5 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -109,6 +109,8 @@ internal void Validate() public string Stream { get; } public Func MessageHandler { get; set; } + + public Func ConnectionClosedHandler { get; set; } } public class RawConsumer : AbstractEntity, IConsumer, IDisposable @@ -141,7 +143,7 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu Logger = logger ?? NullLogger.Instance; _initialCredits = config.InitialCredits; _config = config; - Logger.LogDebug("Creating... {ConsumerInfo}", DumpEntityConfiguration()); + Logger.LogDebug("Creating... {DumpEntityConfiguration}", DumpEntityConfiguration()); Info = new ConsumerInfo(_config.Stream, _config.Reference); // _chunksBuffer is a channel that is used to buffer the chunks _chunksBuffer = Channel.CreateBounded(new BoundedChannelOptions(_initialCredits) @@ -470,7 +472,7 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) // }, Token); } - internal async Task Init() + private async Task Init() { _config.Validate(); @@ -619,8 +621,8 @@ private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() => // we call the Close to re-enter to the standard behavior // ignoreIfClosed is an optimization to avoid to send the unsubscribe _config.Pool.RemoveConsumerEntityFromStream(_client.ClientId, EntityId, _config.Stream); + await Shutdown(_config, true).ConfigureAwait(false); _config.MetadataHandler?.Invoke(metaDataUpdate); - await Close().ConfigureAwait(false); }; private Client.ConnectionCloseHandler OnConnectionClosed() => diff --git a/RabbitMQ.Stream.Client/RawProducer.cs b/RabbitMQ.Stream.Client/RawProducer.cs index 90a6ca83..74a1b3e3 100644 --- a/RabbitMQ.Stream.Client/RawProducer.cs +++ b/RabbitMQ.Stream.Client/RawProducer.cs @@ -57,7 +57,7 @@ public class RawProducer : AbstractEntity, IProducer, IDisposable protected override string GetStream() => _config.Stream; - protected override string DumpEntityConfiguration() + protected sealed override string DumpEntityConfiguration() { return $"Producer id {EntityId} for stream: {_config.Stream}, reference: {_config.Reference}," + @@ -88,6 +88,8 @@ private RawProducer(Client client, RawProducerConfig config, ILogger logger = nu _client = client; _config = config; Info = new ProducerInfo(_config.Stream, _config.Reference); + Logger = logger ?? NullLogger.Instance; + Logger.LogDebug("Creating... {DumpEntityConfiguration}", DumpEntityConfiguration()); _messageBuffer = Channel.CreateBounded(new BoundedChannelOptions(10000) { AllowSynchronousContinuations = false, @@ -95,7 +97,6 @@ private RawProducer(Client client, RawProducerConfig config, ILogger logger = nu SingleWriter = false, FullMode = BoundedChannelFullMode.Wait }); - Logger = logger ?? NullLogger.Instance; Task.Run(ProcessBuffer); _semaphore = new SemaphoreSlim(config.MaxInFlight, config.MaxInFlight); } @@ -164,15 +165,14 @@ private async Task Init() private Client.ConnectionCloseHandler OnConnectionClosed() => async (reason) => { + _client.ConnectionClosed -= OnConnectionClosed(); + _client.Parameters.OnMetadataUpdate -= OnMetadataUpdate(); _config.Pool.Remove(_client.ClientId); - await Shutdown(_config, true).ConfigureAwait(false); + UpdateStatusToClosed(); if (_config.ConnectionClosedHandler != null) { await _config.ConnectionClosedHandler(reason).ConfigureAwait(false); } - - // remove the event since the connection is closed - _client.ConnectionClosed -= OnConnectionClosed(); }; private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() => @@ -187,14 +187,13 @@ private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() => _client.ConnectionClosed -= OnConnectionClosed(); _client.Parameters.OnMetadataUpdate -= OnMetadataUpdate(); - _config.Pool.RemoveProducerEntityFromStream(_client.ClientId, EntityId, _config.Stream); - // at this point the server has removed the producer from the list // and the DeletePublisher producer is not needed anymore (ignoreIfClosed = true) // we call the Close to re-enter to the standard behavior // ignoreIfClosed is an optimization to avoid to send the DeletePublisher - _config.MetadataHandler?.Invoke(metaDataUpdate); + _config.Pool.RemoveProducerEntityFromStream(_client.ClientId, EntityId, _config.Stream); await Shutdown(_config, true).ConfigureAwait(false); + _config.MetadataHandler?.Invoke(metaDataUpdate); }; private bool IsFilteringEnabled => _config.Filter is { FilterValue: not null }; @@ -364,7 +363,7 @@ private async Task ProcessBuffer() } catch (Exception e) { - Logger.LogError(e, "error while Process Buffer"); + Logger.LogError(e, "{DumpEntityConfiguration} Error while Process Buffer", DumpEntityConfiguration()); } } diff --git a/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs b/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs index 19c06618..f4f2b9ad 100644 --- a/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs +++ b/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -12,7 +13,7 @@ namespace RabbitMQ.Stream.Client; -public class RawSuperStreamConsumer : IConsumer, IDisposable +public class RawSuperStreamConsumer : ISuperStreamConsumer, IDisposable { // ConcurrentDictionary because the consumer can be closed from another thread // The send operations will check if the producer exists and if not it will be created @@ -20,6 +21,7 @@ public class RawSuperStreamConsumer : IConsumer, IDisposable private bool _disposed; private readonly RawSuperStreamConsumerConfig _config; + private readonly SemaphoreSlim _semaphoreSlim = new(1, 1); // Contains the info about the streams (one per partition) private readonly IDictionary _streamInfos; @@ -34,7 +36,7 @@ public class RawSuperStreamConsumer : IConsumer, IDisposable /// /// /// - public static IConsumer Create( + public static ISuperStreamConsumer Create( RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, IDictionary streamInfos, ClientParameters clientParameters, @@ -61,7 +63,6 @@ private RawSuperStreamConsumer( } // We need to copy the config from the super consumer to the standard consumer - private RawConsumerConfig FromStreamConfig(string stream) { return new RawConsumerConfig(stream) @@ -73,21 +74,32 @@ private RawConsumerConfig FromStreamConfig(string stream) ConsumerFilter = _config.ConsumerFilter, Pool = _config.Pool, Crc32 = _config.Crc32, - ConnectionClosedHandler = async (s) => + ConnectionClosedHandler = async (reason) => { - // if the stream is still in the consumer list - // means that the consumer was not closed voluntarily - // and it is needed to recreate it. - // The stream will be removed from the list when the consumer is closed - if (_consumers.ContainsKey(stream)) + _consumers.TryRemove(stream, out var consumer); + if (reason == ConnectionClosedReason.Normal) { - _logger.LogInformation( - "Consumer {ConsumerReference} is disconnected from {StreamIdentifier}. Client will try reconnect", - _config.Reference, + _logger.LogDebug( + "Super Stream consumer {@ConsumerInfo} is closed normally from {StreamIdentifier}", + consumer?.Info, stream ); - _consumers.TryRemove(stream, out _); - await GetConsumer(stream).ConfigureAwait(false); + } + else + { + _logger.LogWarning( + "Super Stream consumer {@ConsumerInfo} is disconnected from {StreamIdentifier} reason: {Reason}", + consumer?.Info, + stream, reason + ); + } + + consumer?.Dispose(); + _streamInfos.Remove(stream); + + if (_config.ConnectionClosedHandler != null) + { + await _config.ConnectionClosedHandler(reason, stream).ConfigureAwait(false); } }, MessageHandler = async (consumer, context, message) => @@ -95,6 +107,7 @@ private RawConsumerConfig FromStreamConfig(string stream) // in the message handler we need to add also the source stream // since there could be multiple streams (one per partition) // it is useful client side to know from which stream the message is coming from + _config.OffsetSpec[stream] = new OffsetTypeOffset(context.Offset); if (_config.MessageHandler != null) { await _config.MessageHandler(stream, consumer, context, message).ConfigureAwait(false); @@ -102,55 +115,12 @@ private RawConsumerConfig FromStreamConfig(string stream) }, MetadataHandler = async update => { - // In case of stream update we remove the producer from the list - // We hide the behavior of the producer to the user - // if needed the connection will be created again - // we "should" always have the stream - - // but we have to handle the case¬ - // We need to wait a bit it can take some time to update the configuration - Thread.Sleep(500); - - _streamInfos.Remove(update.Stream); _consumers.TryRemove(update.Stream, out var consumer); - - // this check is needed only for an edge case - // when the system is closed and the connections for the steam are still open for - // some reason. So if the Client IsClosed we can't operate on it - if (_config.Client.IsClosed || _disposed) - { - return; - } - - var exists = await _config.Client.StreamExists(update.Stream).ConfigureAwait(false); - if (!exists) - { - // The stream doesn't exist anymore - // but this condition should be avoided since the hash routing - // can be compromised - _logger.LogWarning("SuperStream Consumer. Stream {StreamIdentifier} is not available anymore", - update.Stream); - consumer?.Close(); - - } - else + consumer?.Close(); + _streamInfos.Remove(update.Stream); + if (_config.MetadataHandler != null) { - await Task.Run(async () => - { - // this is an edge case when the user remove a replica for the stream - // s0 the topology is changed and the consumer is disconnected - // this is why in this case we need to query the QueryMetadata again - // most of the time this code is not executed - _logger.LogInformation( - "Consumer: {ConsumerReference}. Metadata update for stream {StreamIdentifier}. Client will try reconnect", - _config.Reference, - update.Stream - ); - var x = await _config.Client.QueryMetadata(new[] { update.Stream }).ConfigureAwait(false); - x.StreamInfos.TryGetValue(update.Stream, out var streamInfo); - _streamInfos.Add(update.Stream, streamInfo); - await GetConsumer(update.Stream).ConfigureAwait(false); - }).ConfigureAwait(false); + await _config.MetadataHandler(update).ConfigureAwait(false); } }, OffsetSpec = _config.OffsetSpec.TryGetValue(stream, out var value) ? value : new OffsetTypeNext(), @@ -159,30 +129,47 @@ await Task.Run(async () => private async Task InitConsumer(string stream) { + var index = _streamInfos.Keys.Select((item, index) => new { Item = item, Index = index }) + .First(i => i.Item == stream).Index; - var c = await RawConsumer.Create(_clientParameters with { ClientProvidedName = _clientParameters.ClientProvidedName }, + var c = await RawConsumer.Create( + _clientParameters with { ClientProvidedName = $"{_clientParameters.ClientProvidedName}_{index}" }, FromStreamConfig(stream), _streamInfos[stream], _logger).ConfigureAwait(false); - _logger?.LogDebug("Consumer {ConsumerReference} created for Stream {StreamIdentifier}", _config.Reference, + _logger?.LogDebug("Super stream consumer {ConsumerReference} created for Stream {StreamIdentifier}", c.Info, stream); return c; } - private async Task GetConsumer(string stream) + private async Task MaybeAddConsumer(string stream) { if (!_consumers.ContainsKey(stream)) { var p = await InitConsumer(stream).ConfigureAwait(false); _consumers.TryAdd(stream, p); } - - return _consumers[stream]; } private async Task StartConsumers() { foreach (var stream in _streamInfos.Keys) { - await GetConsumer(stream).ConfigureAwait(false); + await MaybeAddConsumer(stream).ConfigureAwait(false); + } + } + + public async Task ReconnectPartition(StreamInfo streamInfo) + { + await _semaphoreSlim.WaitAsync().ConfigureAwait(false); + try + { + _consumers.TryRemove(streamInfo.Stream, out var consumer); + consumer?.Dispose(); + _streamInfos.TryAdd(streamInfo.Stream, streamInfo); // add the new stream infos + await MaybeAddConsumer(streamInfo.Stream).ConfigureAwait(false); + } + finally + { + _semaphoreSlim.Release(); } } @@ -248,6 +235,7 @@ public RawSuperStreamConsumerConfig(string superStream) /// public Func MessageHandler { get; set; } + public Func ConnectionClosedHandler { get; set; } public string SuperStream { get; } internal Client Client { get; set; } diff --git a/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs b/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs index 23cc2fb1..ddcd3a5c 100644 --- a/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs +++ b/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs @@ -23,7 +23,7 @@ namespace RabbitMQ.Stream.Client; /// When a message is sent to a stream, the producer will be selected based on the stream name and the partition key. /// SuperStreamProducer uses lazy initialization for the producers, when it starts there are no producers until the first message is sent. /// -public class RawSuperStreamProducer : IProducer, IDisposable +public class RawSuperStreamProducer : ISuperStreamProducer, IDisposable { private bool _disposed; @@ -39,11 +39,14 @@ public class RawSuperStreamProducer : IProducer, IDisposable // For example: // invoices(super_stream) -> invoices-0, invoices-1, invoices-2 // Streams contains the configuration for each stream but not the connection + private readonly IDictionary _streamInfos; + private readonly ClientParameters _clientParameters; private readonly ILogger _logger; + private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1); - public static IProducer Create( + public static ISuperStreamProducer Create( RawSuperStreamProducerConfig rawSuperStreamProducerConfig, IDictionary streamInfos, ClientParameters clientParameters, @@ -89,37 +92,33 @@ private RawProducerConfig FromStreamConfig(string stream) MaxInFlight = _config.MaxInFlight, Filter = _config.Filter, Pool = _config.Pool, - ConnectionClosedHandler = (s) => + ConnectionClosedHandler = async (reason) => { - // In case of connection closed, we need to remove the producer from the list - // We hide the behavior of the producer to the user - // if needed the connection will be created again - _producers.TryRemove(stream, out _); - return Task.CompletedTask; + _producers.TryGetValue(stream, out var producer); + if (reason == ConnectionClosedReason.Normal) + { + _logger.LogDebug("Super Stream producer {@ProducerInfo} is closed normally", producer?.Info); + } + else + { + _logger.LogWarning( + "Super Stream producer {@ProducerInfo} is disconnected from {StreamIdentifier} reason: {Reason}", + producer?.Info, + stream, reason + ); + } + + if (_config.ConnectionClosedHandler != null) + { + await _config.ConnectionClosedHandler(reason, stream).ConfigureAwait(false); + } }, - MetadataHandler = update => + MetadataHandler = async update => { - // In case of stream update we remove the producer from the list - // We hide the behavior of the producer to the user - // if needed the connection will be created again - // we "should" always have the stream - - // but we have to handle the case¬ - // We need to wait a bit it can take some time to update the configuration - Thread.Sleep(500); - - var exists = _config.Client.StreamExists(update.Stream); - if (!exists.Result) + if (_config.MetadataHandler != null) { - // The stream doesn't exist anymore - // but this condition should be avoided since the hash routing - // can be compromised - _logger.LogWarning("Stream {StreamIdentifier} is not available anymore", update.Stream); - _streamInfos.Remove(update.Stream); + await _config.MetadataHandler(update).ConfigureAwait(false); } - - _producers.TryRemove(update.Stream, out var producer); - return Task.CompletedTask; }, ClientProvidedName = _config.ClientProvidedName, BatchSize = _config.BatchSize, @@ -130,17 +129,20 @@ private RawProducerConfig FromStreamConfig(string stream) // The producer is created on demand when a message is sent to a stream private async Task InitProducer(string stream) { - var p = await RawProducer.Create(_clientParameters with { ClientProvidedName = _config.ClientProvidedName }, + var index = _streamInfos.Keys.Select((item, index) => new { Item = item, Index = index }) + .First(i => i.Item == stream).Index; + var p = await RawProducer.Create( + _clientParameters with { ClientProvidedName = $"{_config.ClientProvidedName}_{index}" }, FromStreamConfig(stream), _streamInfos[stream], _logger) .ConfigureAwait(false); - _logger?.LogDebug("Producer {ProducerReference} created for Stream {StreamIdentifier}", _config.Reference, + _logger?.LogDebug("Super stream producer {@ProducerReference} created for Stream {StreamIdentifier}", p.Info, stream); return p; } - protected void ThrowIfClosed() + private void ThrowIfClosed() { if (!IsOpen()) { @@ -148,7 +150,7 @@ protected void ThrowIfClosed() } } - private async Task GetProducer(string stream) + private async Task MaybeAddAndGetProducer(string stream) { if (!_producers.ContainsKey(stream)) { @@ -159,6 +161,22 @@ private async Task GetProducer(string stream) return _producers[stream]; } + public async Task ReconnectPartition(StreamInfo streamInfo) + { + await _semaphoreSlim.WaitAsync().ConfigureAwait(false); + try + { + _producers.TryRemove(streamInfo.Stream, out var producer); + producer?.Close(); + _streamInfos[streamInfo.Stream] = streamInfo; // add the new stream infos + await MaybeAddAndGetProducer(streamInfo.Stream).ConfigureAwait(false); + } + finally + { + _semaphoreSlim.Release(); + } + } + // based on the stream name and the partition key, we select the producer private async Task GetProducerForMessage(Message message) { @@ -177,14 +195,30 @@ private async Task GetProducerForMessage(Message message) throw new RouteNotFoundException("No route found for the message to any stream"); } - return await GetProducer(routes[0]).ConfigureAwait(false); + await _semaphoreSlim.WaitAsync().ConfigureAwait(false); + try + { + return await MaybeAddAndGetProducer(routes[0]).ConfigureAwait(false); + } + finally + { + _semaphoreSlim.Release(); + } } public async ValueTask Send(ulong publishingId, Message message) { ThrowIfClosed(); var producer = await GetProducerForMessage(message).ConfigureAwait(false); - await producer.Send(publishingId, message).ConfigureAwait(false); + await _semaphoreSlim.WaitAsync().ConfigureAwait(false); + try + { + await producer.Send(publishingId, message).ConfigureAwait(false); + } + finally + { + _semaphoreSlim.Release(); + } } public async ValueTask Send(List<(ulong, Message)> messages) @@ -209,9 +243,17 @@ public async ValueTask Send(List<(ulong, Message)> messages) } } - foreach (var (producer, list) in aggregate) + await _semaphoreSlim.WaitAsync().ConfigureAwait(false); + try { - await producer.Send(list).ConfigureAwait(false); + foreach (var (producer, list) in aggregate) + { + await producer.Send(list).ConfigureAwait(false); + } + } + finally + { + _semaphoreSlim.Release(); } } @@ -236,11 +278,19 @@ public async ValueTask Send(ulong publishingId, List subEntryMessages, } } - // Here we send the messages to the right producer - // sub aggregate is a list of messages that have to be sent to the same producer - foreach (var (producer, messages) in aggregate) + await _semaphoreSlim.WaitAsync().ConfigureAwait(false); + try { - await producer.Send(publishingId, messages, compressionType).ConfigureAwait(false); + // Here we send the messages to the right producer + // sub aggregate is a list of messages that have to be sent to the same producer + foreach (var (producer, messages) in aggregate) + { + await producer.Send(publishingId, messages, compressionType).ConfigureAwait(false); + } + } + finally + { + _semaphoreSlim.Release(); } } @@ -263,7 +313,7 @@ public Task GetLastPublishingId() { foreach (var stream in _streamInfos.Keys.ToList()) { - GetProducer(stream).Wait(); + MaybeAddAndGetProducer(stream).Wait(); } var v = _producers.Values.Min(p => p.GetLastPublishingId().Result); @@ -280,7 +330,7 @@ public void Dispose() { foreach (var (_, iProducer) in _producers) { - iProducer.Close(); + iProducer.Dispose(); } _disposed = true; @@ -325,6 +375,8 @@ public RawSuperStreamProducerConfig(string superStream) public RoutingStrategyType RoutingStrategyType { get; set; } = RoutingStrategyType.Hash; + public Func ConnectionClosedHandler { get; set; } + internal Client Client { get; set; } } diff --git a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs index cb23e1c4..62d7d0d9 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs @@ -125,7 +125,7 @@ private async void OnTimedEvent(object sender, ElapsedEventArgs e) foreach (var pair in timedOutMessages) { - await RemoveUnConfirmedMessage(ConfirmationStatus.ClientTimeoutError, pair.Value.PublishingId, null) + await RemoveUnConfirmedMessage(ConfirmationStatus.ClientTimeoutError, pair.Value.PublishingId, pair.Value.Stream) .ConfigureAwait(false); } } diff --git a/RabbitMQ.Stream.Client/Reliable/Consumer.cs b/RabbitMQ.Stream.Client/Reliable/Consumer.cs index 6d3dd058..eedc273e 100644 --- a/RabbitMQ.Stream.Client/Reliable/Consumer.cs +++ b/RabbitMQ.Stream.Client/Reliable/Consumer.cs @@ -157,7 +157,6 @@ public ConsumerConfig(StreamSystem streamSystem, string stream) : base(streamSys /// public class Consumer : ConsumerFactory { - private readonly ILogger _logger; protected override ILogger BaseLogger => _logger; @@ -174,13 +173,12 @@ public static async Task Create(ConsumerConfig consumerConfig, ILogger consumerConfig.ReconnectStrategy ??= new BackOffReconnectStrategy(logger); consumerConfig.ResourceAvailableReconnectStrategy ??= new ResourceAvailableBackOffReconnectStrategy(logger); var rConsumer = new Consumer(consumerConfig, logger); - await rConsumer.Init(consumerConfig.ReconnectStrategy, consumerConfig.ResourceAvailableReconnectStrategy).ConfigureAwait(false); - logger?.LogDebug("Consumer: {Reference} created for Stream: {Stream}", - consumerConfig.Reference, consumerConfig.Stream); + await rConsumer.Init(consumerConfig.ReconnectStrategy, consumerConfig.ResourceAvailableReconnectStrategy) + .ConfigureAwait(false); return rConsumer; } - internal override async Task CreateNewEntity(bool boot) + protected override async Task CreateNewEntity(bool boot) { _consumer = await CreateConsumer(boot).ConfigureAwait(false); await _consumerConfig.ReconnectStrategy.WhenConnected(ToString()).ConfigureAwait(false); @@ -189,7 +187,7 @@ internal override async Task CreateNewEntity(bool boot) // just close the consumer. See base/metadataupdate protected override async Task CloseEntity() { - await SemaphoreSlim.WaitAsync(Consts.LongWait).ConfigureAwait(false); + await SemaphoreSlim.WaitAsync().ConfigureAwait(false); try { if (_consumer != null) diff --git a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs index a3f41fef..f9a6b10a 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs @@ -43,6 +43,10 @@ private async Task StandardConsumer(bool boot) offsetSpec = new OffsetTypeOffset(_lastOffsetConsumed[_consumerConfig.Stream] + 1); } + // before creating a new consumer, the old one is disposed + // This is just a safety check, the consumer should be already disposed + _consumer?.Dispose(); + return await _consumerConfig.StreamSystem.CreateRawConsumer(new RawConsumerConfig(_consumerConfig.Stream) { ClientProvidedName = _consumerConfig.ClientProvidedName, @@ -87,9 +91,9 @@ private async Task SuperConsumer(bool boot) // it can restart consuming from the last consumer offset + 1 (+1 since we need to consume from the next) if (!boot && _consumedFirstTime) { - foreach (var (stream, offset) in _lastOffsetConsumed) + foreach (var (streamOff, offset) in _lastOffsetConsumed) { - offsetSpecs[stream] = new OffsetTypeOffset(offset + 1); + offsetSpecs[streamOff] = new OffsetTypeOffset(offset + 1); } } else @@ -103,27 +107,50 @@ private async Task SuperConsumer(bool boot) } } - return await _consumerConfig.StreamSystem.CreateSuperStreamConsumer( - new RawSuperStreamConsumerConfig(_consumerConfig.Stream) - { - ClientProvidedName = _consumerConfig.ClientProvidedName, - Reference = _consumerConfig.Reference, - ConsumerUpdateListener = _consumerConfig.ConsumerUpdateListener, - IsSingleActiveConsumer = _consumerConfig.IsSingleActiveConsumer, - InitialCredits = _consumerConfig.InitialCredits, - ConsumerFilter = _consumerConfig.Filter, - Crc32 = _consumerConfig.Crc32, - OffsetSpec = offsetSpecs, - MessageHandler = async (stream, consumer, ctx, message) => + if (boot) + { + return await _consumerConfig.StreamSystem.CreateSuperStreamConsumer( + new RawSuperStreamConsumerConfig(_consumerConfig.Stream) { - _consumedFirstTime = true; - _lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset; - if (_consumerConfig.MessageHandler != null) + ClientProvidedName = _consumerConfig.ClientProvidedName, + Reference = _consumerConfig.Reference, + ConsumerUpdateListener = _consumerConfig.ConsumerUpdateListener, + IsSingleActiveConsumer = _consumerConfig.IsSingleActiveConsumer, + InitialCredits = _consumerConfig.InitialCredits, + ConsumerFilter = _consumerConfig.Filter, + Crc32 = _consumerConfig.Crc32, + OffsetSpec = offsetSpecs, + ConnectionClosedHandler = async (closeReason, partitionStream) => + { + if (closeReason == ConnectionClosedReason.Normal) + { + BaseLogger.LogInformation("{Identity} is closed normally", ToString()); + return; + } + + var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition; + await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r) + .ConfigureAwait(false); + }, + MetadataHandler = async update => + { + var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition; + await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r) + .ConfigureAwait(false); + }, + MessageHandler = async (partitionStream, consumer, ctx, message) => { - await _consumerConfig.MessageHandler(stream, consumer, ctx, - message).ConfigureAwait(false); - } - }, - }, BaseLogger).ConfigureAwait(false); + _consumedFirstTime = true; + _lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset; + if (_consumerConfig.MessageHandler != null) + { + await _consumerConfig.MessageHandler(partitionStream, consumer, ctx, + message).ConfigureAwait(false); + } + }, + }, BaseLogger).ConfigureAwait(false); + } + + return _consumer; } } diff --git a/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs b/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs index 57118097..92dbbf53 100644 --- a/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs +++ b/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs @@ -48,7 +48,7 @@ public BackOffReconnectStrategy(ILogger logger = null) // else the backoff will be too long private void MaybeResetTentatives() { - if (Tentatives > 1000) + if (Tentatives > 5) { Tentatives = 1; } @@ -56,6 +56,7 @@ private void MaybeResetTentatives() public async ValueTask WhenDisconnected(string connectionIdentifier) { + Tentatives <<= 1; _logger.LogInformation( "{ConnectionIdentifier} disconnected, check if reconnection needed in {ReconnectionDelayMs} ms", @@ -65,6 +66,18 @@ public async ValueTask WhenDisconnected(string connectionIdentifier) await Task.Delay(TimeSpan.FromMilliseconds(Tentatives * 100)).ConfigureAwait(false); MaybeResetTentatives(); return true; + // this will be in another commit + // Tentatives <<= 1; + // var next = Random.Shared.Next(Tentatives * 1000, Tentatives * 2000); + // _logger.LogInformation( + // "{ConnectionIdentifier} disconnected, check if reconnection needed in {ReconnectionDelayMs} ms", + // connectionIdentifier, + // next + // ); + // + // await Task.Delay(TimeSpan.FromMilliseconds(next)).ConfigureAwait(false); + // MaybeResetTentatives(); + // return true; } public ValueTask WhenConnected(string connectionIdentifier) @@ -105,13 +118,13 @@ public async ValueTask WhenDisconnected(string resourceIdentifier) ); await Task.Delay(TimeSpan.FromSeconds(Tentatives)).ConfigureAwait(false); MaybeResetTentatives(); - return Tentatives < 4; + return Tentatives < 5; } public ValueTask WhenConnected(string resourceIdentifier) { Tentatives = 1; - _logger.LogInformation("{ResourceIdentifier} is available", resourceIdentifier); + _logger.LogInformation("{ResourceIdentifier}", resourceIdentifier); return ValueTask.CompletedTask; } } diff --git a/RabbitMQ.Stream.Client/Reliable/Producer.cs b/RabbitMQ.Stream.Client/Reliable/Producer.cs index e62cd368..1fcb0536 100644 --- a/RabbitMQ.Stream.Client/Reliable/Producer.cs +++ b/RabbitMQ.Stream.Client/Reliable/Producer.cs @@ -125,7 +125,7 @@ public ProducerConfig(StreamSystem streamSystem, string stream) : base(streamSys /// public class Producer : ProducerFactory { - private IProducer _producer; + private ulong _publishingId; private readonly ILogger _logger; @@ -163,18 +163,12 @@ public static async Task Create(ProducerConfig producerConfig, ILogger var rProducer = new Producer(producerConfig, logger); await rProducer.Init(producerConfig.ReconnectStrategy, producerConfig.ResourceAvailableReconnectStrategy) .ConfigureAwait(false); - logger?.LogDebug( - "Producer: {Reference} created for Stream: {Stream}", - producerConfig.Reference, - producerConfig.Stream - ); - return rProducer; } - internal override async Task CreateNewEntity(bool boot) + protected override async Task CreateNewEntity(bool boot) { - _producer = await CreateProducer().ConfigureAwait(false); + _producer = await CreateProducer(boot).ConfigureAwait(false); await _producerConfig.ReconnectStrategy.WhenConnected(ToString()).ConfigureAwait(false); @@ -191,7 +185,7 @@ internal override async Task CreateNewEntity(bool boot) protected override async Task CloseEntity() { - await SemaphoreSlim.WaitAsync(Consts.LongWait).ConfigureAwait(false); + await SemaphoreSlim.WaitAsync().ConfigureAwait(false); try { if (_producer != null) @@ -214,7 +208,7 @@ public override async Task Close() } UpdateStatus(ReliableEntityStatus.Closed); - await SemaphoreSlim.WaitAsync(Consts.ShortWait).ConfigureAwait(false); + await SemaphoreSlim.WaitAsync().ConfigureAwait(false); try { _confirmationPipe.Stop(); diff --git a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs index 3964c78a..557797dd 100644 --- a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs @@ -1,6 +1,7 @@ // This source code is dual-licensed under the Apache License, version // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. + using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -15,52 +16,81 @@ namespace RabbitMQ.Stream.Client.Reliable; public abstract class ProducerFactory : ReliableBase { + protected IProducer _producer; protected ProducerConfig _producerConfig; protected ConfirmationPipe _confirmationPipe; - protected async Task CreateProducer() + protected async Task CreateProducer(bool boot) { if (_producerConfig.SuperStreamConfig is { Enabled: true }) { - return await SuperStreamProducer().ConfigureAwait(false); + return await SuperStreamProducer(boot).ConfigureAwait(false); } return await StandardProducer().ConfigureAwait(false); } - private async Task SuperStreamProducer() + private async Task SuperStreamProducer(bool boot) { - return await _producerConfig.StreamSystem.CreateRawSuperStreamProducer( - new RawSuperStreamProducerConfig(_producerConfig.Stream) - { - ClientProvidedName = _producerConfig.ClientProvidedName, - Reference = _producerConfig.Reference, - MessagesBufferSize = _producerConfig.MessagesBufferSize, - MaxInFlight = _producerConfig.MaxInFlight, - Routing = _producerConfig.SuperStreamConfig.Routing, - RoutingStrategyType = _producerConfig.SuperStreamConfig.RoutingStrategyType, - Filter = _producerConfig.Filter, - ConfirmHandler = confirmationHandler => + if (boot) + { + + return await _producerConfig.StreamSystem.CreateRawSuperStreamProducer( + new RawSuperStreamProducerConfig(_producerConfig.Stream) { - var (stream, confirmation) = confirmationHandler; - var confirmationStatus = confirmation.Code switch + ClientProvidedName = _producerConfig.ClientProvidedName, + Reference = _producerConfig.Reference, + MessagesBufferSize = _producerConfig.MessagesBufferSize, + MaxInFlight = _producerConfig.MaxInFlight, + Routing = _producerConfig.SuperStreamConfig.Routing, + RoutingStrategyType = _producerConfig.SuperStreamConfig.RoutingStrategyType, + Filter = _producerConfig.Filter, + ConnectionClosedHandler = async (closeReason, partitionStream) => { - ResponseCode.PublisherDoesNotExist => ConfirmationStatus.PublisherDoesNotExist, - ResponseCode.AccessRefused => ConfirmationStatus.AccessRefused, - ResponseCode.InternalError => ConfirmationStatus.InternalError, - ResponseCode.PreconditionFailed => ConfirmationStatus.PreconditionFailed, - ResponseCode.StreamNotAvailable => ConfirmationStatus.StreamNotAvailable, - ResponseCode.Ok => ConfirmationStatus.Confirmed, - _ => ConfirmationStatus.UndefinedError - }; - _confirmationPipe.RemoveUnConfirmedMessage(confirmationStatus, confirmation.PublishingId, - stream).ConfigureAwait(false); - } - }, BaseLogger).ConfigureAwait(false); + if (closeReason == ConnectionClosedReason.Normal) + { + BaseLogger.LogDebug("{Identity} is closed normally", ToString()); + return; + } + + var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition; + await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r) + .ConfigureAwait(false); + }, + MetadataHandler = async update => + { + var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition; + await OnEntityClosed(_producerConfig.StreamSystem, update.Stream, r) + .ConfigureAwait(false); + }, + ConfirmHandler = confirmationHandler => + { + var (stream, confirmation) = confirmationHandler; + var confirmationStatus = confirmation.Code switch + { + ResponseCode.PublisherDoesNotExist => ConfirmationStatus.PublisherDoesNotExist, + ResponseCode.AccessRefused => ConfirmationStatus.AccessRefused, + ResponseCode.InternalError => ConfirmationStatus.InternalError, + ResponseCode.PreconditionFailed => ConfirmationStatus.PreconditionFailed, + ResponseCode.StreamNotAvailable => ConfirmationStatus.StreamNotAvailable, + ResponseCode.Ok => ConfirmationStatus.Confirmed, + _ => ConfirmationStatus.UndefinedError + }; + _confirmationPipe.RemoveUnConfirmedMessage(confirmationStatus, confirmation.PublishingId, + stream).ConfigureAwait(false); + } + }, BaseLogger).ConfigureAwait(false); + } + + return _producer; } private async Task StandardProducer() { + // before creating a new producer, the old one is disposed + // This is just a safety check, the producer should be already disposed + _producer?.Dispose(); + return await _producerConfig.StreamSystem.CreateRawProducer(new RawProducerConfig(_producerConfig.Stream) { ClientProvidedName = _producerConfig.ClientProvidedName, @@ -75,7 +105,7 @@ private async Task StandardProducer() { if (closeReason == ConnectionClosedReason.Normal) { - BaseLogger.LogInformation("{Identity} is closed normally", ToString()); + BaseLogger.LogDebug("{Identity} is closed normally", ToString()); return; } diff --git a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs index 921f1a8c..bb885514 100644 --- a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs +++ b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs @@ -66,7 +66,7 @@ protected bool CompareStatus(ReliableEntityStatus toTest) } } - protected bool IsValidStatus() + private bool IsValidStatus() { lock (_lock) { @@ -103,27 +103,20 @@ private async Task MaybeInit(bool boot) { if (boot) { + BaseLogger.LogError("{Identity} Error during the first boot {EMessage}", + ToString(), e.Message); // if it is the first boot we don't need to reconnect UpdateStatus(ReliableEntityStatus.Closed); throw; } - reconnect = ClientExceptions.IsAKnownException(e); - + reconnect = true; LogException(e); - if (!reconnect) - { - // We consider the client as closed - // since the exception is raised to the caller - UpdateStatus(ReliableEntityStatus.Closed); - throw; - } + } if (reconnect) - { await MaybeReconnect().ConfigureAwait(false); - } } // @@ -134,10 +127,9 @@ private async Task Init(bool boot) { if (!boot && !IsValidStatus()) { - BaseLogger.LogInformation("{Identity} is already closed", ToString()); + BaseLogger.LogDebug("{Identity} is already closed. The init will be skipped", ToString()); return; } - // each time that the client is initialized, we need to reset the status // if we hare here it means that the entity is not open for some reason like: // first time initialization or reconnect due of a IsAKnownException @@ -159,11 +151,18 @@ private async Task Init(bool boot) /// If it is the First boot for the reliable P/C /// Called by Init method /// - internal abstract Task CreateNewEntity(bool boot); + protected abstract Task CreateNewEntity(bool boot); - protected async Task CheckIfStreamIsAvailable(string stream, StreamSystem system) + /// + /// When the clients receives a meta data update, it doesn't know + /// If the stream exists or not. It just knows that the stream topology has changed. + /// the method CheckIfStreamIsAvailable checks if the stream exists. + /// + /// stream name + /// stream system + /// + private async Task CheckIfStreamIsAvailable(string stream, StreamSystem system) { - await Task.Delay(Consts.RandomMid()).ConfigureAwait(false); var exists = false; var tryAgain = true; @@ -172,7 +171,9 @@ protected async Task CheckIfStreamIsAvailable(string stream, StreamSystem try { exists = await system.StreamExists(stream).ConfigureAwait(false); - await _resourceAvailableReconnectStrategy.WhenConnected(stream).ConfigureAwait(false); + var available = exists ? "available" : "not available"; + await _resourceAvailableReconnectStrategy.WhenConnected($"{stream} is {available}") + .ConfigureAwait(false); break; } catch (Exception e) @@ -182,29 +183,32 @@ protected async Task CheckIfStreamIsAvailable(string stream, StreamSystem } } - if (!exists) - { - // In this case the stream doesn't exist anymore - // the Entity is just closed. - BaseLogger.LogInformation( - "Meta data update stream: {StreamIdentifier}. The stream doesn't exist anymore {Identity} will be closed", - stream, - ToString() - ); - } + if (exists) + return true; + // In this case the stream doesn't exist anymore or it failed to check if the stream exists + // too many tentatives for the reconnection strategy + // the Entity is just closed. + var msg = tryAgain ? "The stream doesn't exist anymore" : "Failed to check if the stream exists"; + + BaseLogger.LogInformation( + "Meta data update stream: {StreamIdentifier}. {Msg} {Identity} will be closed", + stream, msg, + ToString() + ); - return exists; + return false; } // /// Try to reconnect to the broker /// Based on the retry strategy -// - protected async Task MaybeReconnect() + // + private async Task MaybeReconnect() { var reconnect = await _reconnectStrategy.WhenDisconnected(ToString()).ConfigureAwait(false); if (!reconnect) { + BaseLogger.LogDebug("{Identity} is closed due of reconnect strategy", ToString()); UpdateStatus(ReliableEntityStatus.Closed); return; } @@ -212,38 +216,46 @@ protected async Task MaybeReconnect() switch (IsOpen()) { case true: - await TryToReconnect().ConfigureAwait(false); + UpdateStatus(ReliableEntityStatus.Reconnecting); + await MaybeInit(false).ConfigureAwait(false); break; case false: if (CompareStatus(ReliableEntityStatus.Reconnecting)) { - BaseLogger.LogInformation("{Identity} is in Reconnecting", ToString()); + BaseLogger.LogDebug("{Identity} is in Reconnecting", ToString()); } break; } } - /// - /// Try to reconnect to the broker - /// - private async Task TryToReconnect() + private async Task MaybeReconnectPartition(StreamInfo streamInfo, string info, Func reconnectPartitionFunc) { - UpdateStatus(ReliableEntityStatus.Reconnecting); - await MaybeInit(false).ConfigureAwait(false); + var reconnect = await _reconnectStrategy + .WhenDisconnected($"Super Stream partition: {streamInfo.Stream} for {info}").ConfigureAwait(false); + + if (!reconnect) + { + BaseLogger.LogDebug("{Identity} partition is closed due of reconnect strategy", ToString()); + UpdateStatus(ReliableEntityStatus.Closed); + return; + } + + try + { + UpdateStatus(ReliableEntityStatus.Reconnecting); + await reconnectPartitionFunc(streamInfo).ConfigureAwait(false); + UpdateStatus(ReliableEntityStatus.Open); + await _reconnectStrategy.WhenConnected( + $"Super Stream partition: {streamInfo.Stream} for {info}").ConfigureAwait(false); + } + catch (Exception e) + { + LogException(e); + await MaybeReconnectPartition(streamInfo, info, reconnectPartitionFunc).ConfigureAwait(false); + } } - /// - /// When the clients receives a meta data update, it doesn't know - /// the reason. - /// Metadata update can be raised when: - /// - stream is deleted - /// - change the stream topology (ex: add a follower) - /// - /// HandleMetaDataMaybeReconnect checks if the stream still exists - /// and try to reconnect. - /// (internal because it is needed for tests) - /// private void LogException(Exception exception) { const string KnownExceptionTemplate = "{Identity} trying to reconnect due to exception {Err}"; @@ -267,6 +279,42 @@ private void LogException(Exception exception) /// protected abstract Task CloseEntity(); + /// + /// Handle the partition reconnection in case of super stream entity + /// + /// Stream System + /// Partition Stream + /// Function to reconnect the partition + internal async Task OnEntityClosed(StreamSystem system, string stream, Func reconnectPartitionFunc) + { + var streamExists = false; + await SemaphoreSlim.WaitAsync().ConfigureAwait(false); + try + { + streamExists = await CheckIfStreamIsAvailable(stream, system) + .ConfigureAwait(false); + if (streamExists) + { + var streamInfo = await system.StreamInfo(stream).ConfigureAwait(false); + await MaybeReconnectPartition(streamInfo, ToString(), reconnectPartitionFunc).ConfigureAwait(false); + } + } + finally + { + SemaphoreSlim.Release(); + } + + if (!streamExists) + { + await Close().ConfigureAwait(false); + } + } + + /// + /// Handle the regular stream reconnection + /// + /// Stream system + /// Stream internal async Task OnEntityClosed(StreamSystem system, string stream) { var streamExists = false; diff --git a/RabbitMQ.Stream.Client/RoutingClient.cs b/RabbitMQ.Stream.Client/RoutingClient.cs index 230bbdd1..9a22a717 100644 --- a/RabbitMQ.Stream.Client/RoutingClient.cs +++ b/RabbitMQ.Stream.Client/RoutingClient.cs @@ -170,16 +170,17 @@ public static async Task LookupRandomConnection(ClientParameters client { var brokers = new List() { metaDataInfo.Leader }; brokers.AddRange(metaDataInfo.Replicas); - // brokers.Sort((_, _) => Random.Shared.Next(-1, 1)); - var br = brokers.OrderBy(x => Random.Shared.Next()).ToList(); var exceptions = new List(); + var br = brokers.OrderBy(x => Random.Shared.Next()).ToList(); + foreach (var broker in br) { try { return await pool.GetOrCreateClient(broker.ToString(), async () => - await LookupConnection(clientParameters, broker, MaxAttempts(metaDataInfo), logger) + await LookupConnection(clientParameters, broker, MaxAttempts(metaDataInfo), + logger) .ConfigureAwait(false)).ConfigureAwait(false); } catch (Exception ex) diff --git a/RabbitMQ.Stream.Client/StreamSystem.cs b/RabbitMQ.Stream.Client/StreamSystem.cs index 17b038a8..9a7ee8ec 100644 --- a/RabbitMQ.Stream.Client/StreamSystem.cs +++ b/RabbitMQ.Stream.Client/StreamSystem.cs @@ -162,7 +162,7 @@ private static void CheckLeader(StreamInfo metaStreamInfo) } } - public async Task CreateRawSuperStreamProducer( + public async Task CreateRawSuperStreamProducer( RawSuperStreamProducerConfig rawSuperStreamProducerConfig, ILogger logger = null) { await MayBeReconnectLocator().ConfigureAwait(false); @@ -222,7 +222,7 @@ public async Task QueryPartition(string superStream) return partitions.Streams; } - public async Task CreateSuperStreamConsumer( + public async Task CreateSuperStreamConsumer( RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, ILogger logger = null) { @@ -284,8 +284,6 @@ public async Task CreateRawProducer(RawProducerConfig rawProducerConf var p = await RawProducer.Create(s, rawProducerConfig, metaStreamInfo, logger).ConfigureAwait(false); - _logger?.LogDebug("Raw Producer: {Reference} created for Stream: {Stream}", - rawProducerConfig.Reference, rawProducerConfig.Stream); return p; } finally @@ -294,7 +292,7 @@ public async Task CreateRawProducer(RawProducerConfig rawProducerConf } } - private async Task StreamInfo(string streamName) + public async Task StreamInfo(string streamName) { // force localhost connection for single node clusters and when address resolver is not provided // when theres 1 endpoint and an address resolver, there could be a cluster behind a load balancer @@ -350,7 +348,15 @@ public async Task CreateStream(StreamSpec spec) public async Task StreamExists(string stream) { await MayBeReconnectLocator().ConfigureAwait(false); - return await _client.StreamExists(stream).ConfigureAwait(false); + await _semClientProvidedName.WaitAsync().ConfigureAwait(false); + try + { + return await _client.StreamExists(stream).ConfigureAwait(false); + } + finally + { + _semClientProvidedName.Release(); + } } private static void MaybeThrowQueryException(string reference, string stream) diff --git a/Tests/RawConsumerSystemTests.cs b/Tests/RawConsumerSystemTests.cs index 52cd20c4..c92242e3 100644 --- a/Tests/RawConsumerSystemTests.cs +++ b/Tests/RawConsumerSystemTests.cs @@ -621,7 +621,7 @@ public async void ConsumerMetadataHandlerUpdate() SystemUtils.Wait(); await system.DeleteStream(stream); new Utils(testOutputHelper).WaitUntilTaskCompletes(testPassed); - Assert.False(((RawConsumer)rawConsumer).IsOpen()); + SystemUtils.WaitUntil(() => ((RawConsumer)rawConsumer).IsOpen() == false); await rawConsumer.Close(); await system.Close(); } diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs index c1896063..d7fa108e 100644 --- a/Tests/ReliableTests.cs +++ b/Tests/ReliableTests.cs @@ -505,7 +505,7 @@ internal FakeThrowExceptionConsumer(ConsumerConfig consumerConfig, Exception exc _exceptionType = exceptionType; } - internal override Task CreateNewEntity(bool boot) + protected override Task CreateNewEntity(bool boot) { if (!_firstTime) { diff --git a/Tests/SuperStreamConsumerTests.cs b/Tests/SuperStreamConsumerTests.cs index 17cb3f04..68e87e5c 100644 --- a/Tests/SuperStreamConsumerTests.cs +++ b/Tests/SuperStreamConsumerTests.cs @@ -32,19 +32,26 @@ public async void NumberOfConnectionsShouldBeEqualsToThePartitions() { SystemUtils.ResetSuperStreams(); var system = await StreamSystem.Create(new StreamSystemConfig()); - var connectionName = Guid.NewGuid().ToString(); + var clientProvidedName = Guid.NewGuid().ToString(); var consumer = await system.CreateSuperStreamConsumer( new RawSuperStreamConsumerConfig(SystemUtils.InvoicesExchange) { - ClientProvidedName = connectionName, + ClientProvidedName = clientProvidedName, OffsetSpec = await SystemUtils.OffsetsForSuperStreamConsumer(system, "invoices", new OffsetTypeFirst()) }); Assert.NotNull(consumer); SystemUtils.Wait(); - SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName(connectionName).Result == 3); + + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); Assert.Equal(ResponseCode.Ok, await consumer.Close()); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 0); + await system.Close(); } @@ -113,15 +120,72 @@ public async void RemoveOneConnectionIfaStreamIsDeleted() Assert.NotNull(consumer); SystemUtils.Wait(); - SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName(clientProvidedName).Result == 3); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 1); SystemUtils.HttpDeleteQueue(SystemUtils.InvoicesStream0); - SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName(clientProvidedName).Result == 2); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); SystemUtils.HttpDeleteQueue(SystemUtils.InvoicesStream1); - SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName(clientProvidedName).Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); await consumer.Close(); // in this case we don't have any connection anymore since the super stream consumer is closed - SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName(clientProvidedName).Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); + Assert.Equal(ResponseCode.Ok, await consumer.Close()); + await system.Close(); + } + + [Fact] + public async void SingleConsumerReconnectInCaseOfKillingConnection() + { + SystemUtils.ResetSuperStreams(); + var system = await StreamSystem.Create(new StreamSystemConfig()); + var clientProvidedName = Guid.NewGuid().ToString(); + + var configuration = new RawSuperStreamConsumerConfig(SystemUtils.InvoicesExchange) + { + ClientProvidedName = clientProvidedName, + OffsetSpec = + await SystemUtils.OffsetsForSuperStreamConsumer(system, "invoices", new OffsetTypeFirst()) + }; + + var consumer = await system.CreateSuperStreamConsumer(configuration); + var completed = new TaskCompletionSource(); + configuration.ConnectionClosedHandler = async (reason, stream) => + { + if (reason == ConnectionClosedReason.Unexpected) + { + SystemUtils.Wait(); + await consumer.ReconnectPartition( + await system.StreamInfo(stream).ConfigureAwait(false) + ); + SystemUtils.Wait(); + completed.SetResult(true); + } + }; + + Assert.NotNull(consumer); + SystemUtils.Wait(); + + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); + + SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections($"{clientProvidedName}_0").Result == 1); + + completed.Task.Wait(); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); Assert.Equal(ResponseCode.Ok, await consumer.Close()); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 0); await system.Close(); } @@ -210,7 +274,7 @@ public async void MoreConsumersNumberOfMessagesConsumedShouldBeEqualsToPublished const int NumberOfMessages = 20; var system = await StreamSystem.Create(new StreamSystemConfig()); var publishToSuperStreamTask = - SystemUtils.PublishMessagesSuperStream(system, "invoices", NumberOfMessages, "", _testOutputHelper); + SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, NumberOfMessages, "", _testOutputHelper); if (await Task.WhenAny(publishToSuperStreamTask, Task.Delay(20000)) != publishToSuperStreamTask) { Assert.Fail("timeout waiting to publish messages"); @@ -382,7 +446,9 @@ Func> consumerUpdateListener SystemUtils.Wait(TimeSpan.FromSeconds(2)); // we kill the connections of the first super stream consumer ( so 3 connections one per stream) - SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections(clientProvidedName).Result == 3); + SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections($"{clientProvidedName}_0").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections($"{clientProvidedName}_1").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections($"{clientProvidedName}_2").Result == 1); SystemUtils.Wait(TimeSpan.FromSeconds(3)); // at this point the second consumer should be active and consume the messages // and the consumerUpdateListener should be called and the offset should be restored @@ -399,7 +465,10 @@ Func> consumerUpdateListener } // just to be sure that the connections are killed - SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName(clientProvidedName).Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 0); + await consumerSingle.Close(); await system.Close(); } diff --git a/Tests/SuperStreamProducerTests.cs b/Tests/SuperStreamProducerTests.cs index fd7a3cc6..47249ef3 100644 --- a/Tests/SuperStreamProducerTests.cs +++ b/Tests/SuperStreamProducerTests.cs @@ -143,7 +143,6 @@ await system.CreateRawSuperStreamProducer(new RawSuperStreamProducerConfig(Syste { Routing = message1 => message1.Properties.MessageId.ToString(), Reference = "reference", - }); Assert.True(streamProducer.MessagesSent == 0); Assert.True(streamProducer.ConfirmFrames == 0); @@ -421,16 +420,26 @@ public async void SendMessageToSuperStreamRecreateConnectionsIfKilled() SystemUtils.ResetSuperStreams(); // This test validates that the super stream producer is able to recreate the connection // if the connection is killed - // It is NOT meant to test the availability of the super stream producer - // just the reconnect mechanism + // we use the connection closed handler to recreate the connection + // the method ReconnectPartition is used to reconnect the partition var system = await StreamSystem.Create(new StreamSystemConfig()); var clientName = Guid.NewGuid().ToString(); - var streamProducer = - await system.CreateRawSuperStreamProducer(new RawSuperStreamProducerConfig(SystemUtils.InvoicesExchange) - { - Routing = message1 => message1.Properties.MessageId.ToString(), - ClientProvidedName = clientName - }); + + var c = new RawSuperStreamProducerConfig(SystemUtils.InvoicesExchange) + { + Routing = message1 => message1.Properties.MessageId.ToString(), + ClientProvidedName = clientName, + }; + var completed = new TaskCompletionSource(); + var streamProducer = await system.CreateRawSuperStreamProducer(c); + c.ConnectionClosedHandler = async (reason, stream) => + { + var streamInfo = await system.StreamInfo(stream); + await streamProducer.ReconnectPartition(streamInfo); + SystemUtils.Wait(); + completed.SetResult(true); + }; + for (ulong i = 0; i < 20; i++) { var message = new Message(Encoding.Default.GetBytes("hello")) @@ -440,8 +449,8 @@ await system.CreateRawSuperStreamProducer(new RawSuperStreamProducerConfig(Syste if (i == 10) { - SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections(clientName).Result == 3); - // We just decide to close the connections + SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections($"{clientName}_0").Result == 1); + completed.Task.Wait(); } // Here the connection _must_ be recreated and the send the message @@ -453,7 +462,7 @@ await system.CreateRawSuperStreamProducer(new RawSuperStreamProducerConfig(Syste // according to the routing strategy hello{i} that must be the correct routing SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream0) == 9); SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream1) == 7); - SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount("invoices-2") == 4); + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream2) == 4); Assert.True(await streamProducer.Close() == ResponseCode.Ok); await system.Close(); } @@ -533,7 +542,7 @@ await system.CreateRawSuperStreamProducer(new RawSuperStreamProducerConfig(Syste } } }); - for (ulong i = 0; i < 10; i++) + for (ulong i = 0; i < 20; i++) { var message = new Message(Encoding.Default.GetBytes("hello")) { @@ -549,12 +558,21 @@ await system.CreateRawSuperStreamProducer(new RawSuperStreamProducerConfig(Syste } Thread.Sleep(200); - - await streamProducer.Send(i, message); + try + { + await streamProducer.Send(i, message); + } + catch (Exception e) + { + Assert.True(e is AlreadyClosedException); + } } SystemUtils.Wait(); new Utils(_testOutputHelper).WaitUntilTaskCompletes(testPassed); + // even we removed a stream the producer should be able to send messages and maintain the hash routing + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream1) == 7); + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream2) == 4); Assert.True(await streamProducer.Close() == ResponseCode.Ok); await system.Close(); } @@ -750,13 +768,29 @@ public async void ReliableProducerSendMessageConnectionsIfKilled() // just the reconnect mechanism var system = await StreamSystem.Create(new StreamSystemConfig()); var clientName = Guid.NewGuid().ToString(); + var testPassed = new TaskCompletionSource() { }; + var received = 0; + var error = 0; var streamProducer = await Producer.Create(new ProducerConfig(system, SystemUtils.InvoicesExchange) { - SuperStreamConfig = new SuperStreamConfig() + SuperStreamConfig = + new SuperStreamConfig() { Routing = message1 => message1.Properties.MessageId.ToString() }, + TimeoutMessageAfter = TimeSpan.FromSeconds(1), + ConfirmationHandler = async confirmation => { - Routing = message1 => message1.Properties.MessageId.ToString() + if (confirmation.Status != ConfirmationStatus.Confirmed) + { + Interlocked.Increment(ref error); + } + + if (Interlocked.Increment(ref received) == 20) + { + testPassed.SetResult(true); + } + + await Task.CompletedTask; }, - ClientProvidedName = clientName + ClientProvidedName = clientName, }); for (ulong i = 0; i < 20; i++) { @@ -764,14 +798,11 @@ public async void ReliableProducerSendMessageConnectionsIfKilled() { Properties = new Properties() { MessageId = $"hello{i}" } }; - if (i == 10) { - SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections(clientName).Result == 3); // We just decide to close the connections - // we just wait a bit to be sure that the connections - // will be re-opened - SystemUtils.Wait(TimeSpan.FromSeconds(1)); + // The messages will go in time out since not confirmed + SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections($"{clientName}_0").Result == 1); } // Here the connection _must_ be recreated and the message sent @@ -779,11 +810,13 @@ public async void ReliableProducerSendMessageConnectionsIfKilled() } SystemUtils.Wait(); - // Total messages must be 20 - // according to the routing strategy hello{i} that must be the correct routing - SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream0) == 9); + new Utils(_testOutputHelper).WaitUntilTaskCompletes(testPassed); + // killed the connection for the InvoicesStream0. So received + error must be 9 + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream0) + error == 9); SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream1) == 7); SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream2) == 4); + + await streamProducer.Close(); await system.Close(); } } diff --git a/Tests/UnitTests.cs b/Tests/UnitTests.cs index 34dabc26..7e109f02 100644 --- a/Tests/UnitTests.cs +++ b/Tests/UnitTests.cs @@ -32,10 +32,12 @@ public Task Close(string reason) } public IDictionary Consumers { get; } + public bool IsClosed { get; } public FakeClient(ClientParameters clientParameters) { Parameters = clientParameters; + IsClosed = false; Publishers = new Dictionary>, Action<(ulong, ResponseCode)[]>))>(); Consumers = new Dictionary(); diff --git a/Tests/Utils.cs b/Tests/Utils.cs index de8d2d24..49f2194d 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -200,13 +200,10 @@ public static async Task PublishMessagesSuperStream(StreamSystem system, string Routing = message1 => message1.Properties.MessageId.ToString(), ConfirmHandler = _ => { - count++; - if (count != numberOfMessages) + if (Interlocked.Increment(ref count) == numberOfMessages) { - return; + testPassed.SetResult(count); } - - testPassed.SetResult(count); } });