From c4cf4db976cc401ba237eb40da4dde5ecb939c1e Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 12 Jan 2024 10:28:17 +0100 Subject: [PATCH 1/9] Improve the super stream reconnection Fixes https://github.com/rabbitmq/rabbitmq-stream-dotnet-client/discussions/333 Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/Client.cs | 7 ++ RabbitMQ.Stream.Client/IConsumer.cs | 2 - RabbitMQ.Stream.Client/PublicAPI.Shipped.txt | 2 - .../PublicAPI.Unshipped.txt | 10 +- RabbitMQ.Stream.Client/RawConsumer.cs | 5 +- .../RawSuperStreamConsumer.cs | 108 +++++++----------- RabbitMQ.Stream.Client/Reliable/Consumer.cs | 6 +- .../Reliable/ConsumerFactory.cs | 97 ++++++++++++---- .../Reliable/IReconnectStrategy.cs | 4 +- RabbitMQ.Stream.Client/Reliable/Producer.cs | 2 +- .../Reliable/ReliableBase.cs | 53 ++++++--- RabbitMQ.Stream.Client/StreamSystem.cs | 10 +- Tests/RawConsumerSystemTests.cs | 2 +- Tests/ReliableTests.cs | 2 +- 14 files changed, 189 insertions(+), 121 deletions(-) diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs index 3aca7691..f009a1db 100644 --- a/RabbitMQ.Stream.Client/Client.cs +++ b/RabbitMQ.Stream.Client/Client.cs @@ -843,6 +843,13 @@ public async Task StreamExists(string stream) { var streams = new[] { stream }; var response = await QueryMetadata(streams).ConfigureAwait(false); + if (response.StreamInfos is { Count: >= 1 } && + response.StreamInfos[stream].ResponseCode == ResponseCode.StreamNotAvailable) + { + + ClientExceptions.MaybeThrowException(ResponseCode.StreamNotAvailable, stream); + } + return response.StreamInfos is { Count: >= 1 } && response.StreamInfos[stream].ResponseCode == ResponseCode.Ok; } diff --git a/RabbitMQ.Stream.Client/IConsumer.cs b/RabbitMQ.Stream.Client/IConsumer.cs index d8e8dee0..354c2bc8 100644 --- a/RabbitMQ.Stream.Client/IConsumer.cs +++ b/RabbitMQ.Stream.Client/IConsumer.cs @@ -42,8 +42,6 @@ public record IConsumerConfig : EntityCommonConfig, INamedEntity public string Reference { get; set; } - public Func ConnectionClosedHandler { get; set; } - public ConsumerFilter ConsumerFilter { get; set; } = null; // InitialCredits is the initial credits to be used for the consumer. diff --git a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt index a76097d5..6ffcb1f3 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt @@ -335,8 +335,6 @@ RabbitMQ.Stream.Client.IConsumer.StoreOffset(ulong offset) -> System.Threading.T RabbitMQ.Stream.Client.IConsumerConfig RabbitMQ.Stream.Client.IConsumerConfig.ClientProvidedName.get -> string RabbitMQ.Stream.Client.IConsumerConfig.ClientProvidedName.set -> void -RabbitMQ.Stream.Client.IConsumerConfig.ConnectionClosedHandler.get -> System.Func -RabbitMQ.Stream.Client.IConsumerConfig.ConnectionClosedHandler.set -> void RabbitMQ.Stream.Client.IConsumerConfig.ConsumerUpdateListener.get -> System.Func> RabbitMQ.Stream.Client.IConsumerConfig.ConsumerUpdateListener.set -> void RabbitMQ.Stream.Client.IConsumerConfig.IsSingleActiveConsumer.get -> bool diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index 73e640d4..abc8ca62 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -2,6 +2,7 @@ abstract RabbitMQ.Stream.Client.AbstractEntity.Close() -> System.Threading.Tasks abstract RabbitMQ.Stream.Client.AbstractEntity.DeleteEntityFromTheServer(bool ignoreIfAlreadyDeleted = false) -> System.Threading.Tasks.Task abstract RabbitMQ.Stream.Client.AbstractEntity.DumpEntityConfiguration() -> string abstract RabbitMQ.Stream.Client.AbstractEntity.GetStream() -> string +abstract RabbitMQ.Stream.Client.Reliable.ReliableBase.CreateNewEntity(bool boot) -> System.Threading.Tasks.Task const RabbitMQ.Stream.Client.RouteQueryResponse.Key = 24 -> ushort const RabbitMQ.Stream.Client.StreamStatsResponse.Key = 28 -> ushort override RabbitMQ.Stream.Client.Broker.ToString() -> string @@ -159,9 +160,14 @@ RabbitMQ.Stream.Client.PublishFilter.PublishFilter(byte publisherId, System.Coll RabbitMQ.Stream.Client.PublishFilter.SizeNeeded.get -> int RabbitMQ.Stream.Client.PublishFilter.Write(System.Span span) -> int RabbitMQ.Stream.Client.RawConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo +RabbitMQ.Stream.Client.RawConsumerConfig.ConnectionClosedHandler.get -> System.Func +RabbitMQ.Stream.Client.RawConsumerConfig.ConnectionClosedHandler.set -> void RabbitMQ.Stream.Client.RawProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo RabbitMQ.Stream.Client.RawSuperStreamConsumer.Close() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.RawSuperStreamConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo +RabbitMQ.Stream.Client.RawSuperStreamConsumer.ReconnectPartition(string stream) -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig.ConnectionClosedHandler.get -> System.Func +RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig.ConnectionClosedHandler.set -> void RabbitMQ.Stream.Client.RawSuperStreamProducer.Close() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.RawSuperStreamProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType @@ -189,11 +195,11 @@ RabbitMQ.Stream.Client.Reliable.Producer.Info.get -> RabbitMQ.Stream.Client.Prod RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void -RabbitMQ.Stream.Client.Reliable.ReliableBase.CheckIfStreamIsAvailable(string stream, RabbitMQ.Stream.Client.StreamSystem system) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Reliable.ReliableBase.CompareStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus toTest) -> bool RabbitMQ.Stream.Client.Reliable.ReliableBase.IsValidStatus() -> bool -RabbitMQ.Stream.Client.Reliable.ReliableBase.MaybeReconnect() -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.Reliable.ReliableBase.LogException(System.Exception exception) -> void RabbitMQ.Stream.Client.Reliable.ReliableBase.UpdateStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus status) -> void +RabbitMQ.Stream.Client.Reliable.ReliableBase._reconnectStrategy -> RabbitMQ.Stream.Client.Reconnect.IReconnectStrategy RabbitMQ.Stream.Client.Reliable.ReliableBase._status -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus RabbitMQ.Stream.Client.Reliable.ReliableConfig.ReconnectStrategy.get -> RabbitMQ.Stream.Client.Reconnect.IReconnectStrategy RabbitMQ.Stream.Client.Reliable.ReliableConfig.ResourceAvailableReconnectStrategy.get -> RabbitMQ.Stream.Client.Reconnect.IReconnectStrategy diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index b963b372..c10d6009 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -109,6 +109,8 @@ internal void Validate() public string Stream { get; } public Func MessageHandler { get; set; } + + public Func ConnectionClosedHandler { get; set; } } public class RawConsumer : AbstractEntity, IConsumer, IDisposable @@ -619,8 +621,9 @@ private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() => // we call the Close to re-enter to the standard behavior // ignoreIfClosed is an optimization to avoid to send the unsubscribe _config.Pool.RemoveConsumerEntityFromStream(_client.ClientId, EntityId, _config.Stream); - _config.MetadataHandler?.Invoke(metaDataUpdate); await Close().ConfigureAwait(false); + + _config.MetadataHandler?.Invoke(metaDataUpdate); }; private Client.ConnectionCloseHandler OnConnectionClosed() => diff --git a/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs b/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs index 19c06618..ea2626dd 100644 --- a/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs +++ b/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs @@ -20,9 +20,10 @@ public class RawSuperStreamConsumer : IConsumer, IDisposable private bool _disposed; private readonly RawSuperStreamConsumerConfig _config; + private readonly SemaphoreSlim _semaphoreSlim = new(1, 1); // Contains the info about the streams (one per partition) - private readonly IDictionary _streamInfos; + private readonly ConcurrentDictionary _streamInfos; private readonly ClientParameters _clientParameters; private readonly ILogger _logger; @@ -52,7 +53,12 @@ private RawSuperStreamConsumer( ) { _config = config; - _streamInfos = streamInfos; + _streamInfos = new ConcurrentDictionary(streamInfos); + foreach (var keyValuePair in streamInfos) + { + _streamInfos.TryAdd(keyValuePair.Key, keyValuePair.Value); + } + _clientParameters = clientParameters; _logger = logger ?? NullLogger.Instance; Info = new ConsumerInfo(_config.SuperStream, _config.Reference); @@ -61,7 +67,6 @@ private RawSuperStreamConsumer( } // We need to copy the config from the super consumer to the standard consumer - private RawConsumerConfig FromStreamConfig(string stream) { return new RawConsumerConfig(stream) @@ -73,21 +78,23 @@ private RawConsumerConfig FromStreamConfig(string stream) ConsumerFilter = _config.ConsumerFilter, Pool = _config.Pool, Crc32 = _config.Crc32, - ConnectionClosedHandler = async (s) => + ConnectionClosedHandler = async (reason) => { - // if the stream is still in the consumer list - // means that the consumer was not closed voluntarily - // and it is needed to recreate it. - // The stream will be removed from the list when the consumer is closed - if (_consumers.ContainsKey(stream)) + _consumers.TryRemove(stream, out var consumer); + if (consumer != null) { _logger.LogInformation( - "Consumer {ConsumerReference} is disconnected from {StreamIdentifier}. Client will try reconnect", + "Consumer {ConsumerReference} is disconnected from {StreamIdentifier} reason {Reason}", _config.Reference, - stream + stream, reason ); - _consumers.TryRemove(stream, out _); - await GetConsumer(stream).ConfigureAwait(false); + + if (_config.ConnectionClosedHandler != null) + { + await _config.ConnectionClosedHandler(reason, stream).ConfigureAwait(false); + } + + // await GetConsumer(stream).ConfigureAwait(false); } }, MessageHandler = async (consumer, context, message) => @@ -95,6 +102,7 @@ private RawConsumerConfig FromStreamConfig(string stream) // in the message handler we need to add also the source stream // since there could be multiple streams (one per partition) // it is useful client side to know from which stream the message is coming from + _config.OffsetSpec[stream] = new OffsetTypeOffset(context.Offset); if (_config.MessageHandler != null) { await _config.MessageHandler(stream, consumer, context, message).ConfigureAwait(false); @@ -102,55 +110,11 @@ private RawConsumerConfig FromStreamConfig(string stream) }, MetadataHandler = async update => { - // In case of stream update we remove the producer from the list - // We hide the behavior of the producer to the user - // if needed the connection will be created again - // we "should" always have the stream - - // but we have to handle the case¬ - // We need to wait a bit it can take some time to update the configuration - Thread.Sleep(500); - - _streamInfos.Remove(update.Stream); _consumers.TryRemove(update.Stream, out var consumer); - - // this check is needed only for an edge case - // when the system is closed and the connections for the steam are still open for - // some reason. So if the Client IsClosed we can't operate on it - if (_config.Client.IsClosed || _disposed) + consumer?.Close(); + if (_config.MetadataHandler != null) { - return; - } - - var exists = await _config.Client.StreamExists(update.Stream).ConfigureAwait(false); - if (!exists) - { - // The stream doesn't exist anymore - // but this condition should be avoided since the hash routing - // can be compromised - _logger.LogWarning("SuperStream Consumer. Stream {StreamIdentifier} is not available anymore", - update.Stream); - consumer?.Close(); - - } - else - { - await Task.Run(async () => - { - // this is an edge case when the user remove a replica for the stream - // s0 the topology is changed and the consumer is disconnected - // this is why in this case we need to query the QueryMetadata again - // most of the time this code is not executed - _logger.LogInformation( - "Consumer: {ConsumerReference}. Metadata update for stream {StreamIdentifier}. Client will try reconnect", - _config.Reference, - update.Stream - ); - var x = await _config.Client.QueryMetadata(new[] { update.Stream }).ConfigureAwait(false); - x.StreamInfos.TryGetValue(update.Stream, out var streamInfo); - _streamInfos.Add(update.Stream, streamInfo); - await GetConsumer(update.Stream).ConfigureAwait(false); - }).ConfigureAwait(false); + await _config.MetadataHandler(update).ConfigureAwait(false); } }, OffsetSpec = _config.OffsetSpec.TryGetValue(stream, out var value) ? value : new OffsetTypeNext(), @@ -159,23 +123,21 @@ await Task.Run(async () => private async Task InitConsumer(string stream) { - - var c = await RawConsumer.Create(_clientParameters with { ClientProvidedName = _clientParameters.ClientProvidedName }, + var c = await RawConsumer.Create( + _clientParameters with { ClientProvidedName = _clientParameters.ClientProvidedName }, FromStreamConfig(stream), _streamInfos[stream], _logger).ConfigureAwait(false); _logger?.LogDebug("Consumer {ConsumerReference} created for Stream {StreamIdentifier}", _config.Reference, stream); return c; } - private async Task GetConsumer(string stream) + private async Task GetConsumer(string stream) { if (!_consumers.ContainsKey(stream)) { var p = await InitConsumer(stream).ConfigureAwait(false); _consumers.TryAdd(stream, p); } - - return _consumers[stream]; } private async Task StartConsumers() @@ -186,6 +148,21 @@ private async Task StartConsumers() } } + public async Task ReconnectPartition(string stream) + { + await _semaphoreSlim.WaitAsync().ConfigureAwait(false); + try + { + _consumers.TryRemove(stream, out var consumer); + consumer?.Close(); + await GetConsumer(stream).ConfigureAwait(false); + } + finally + { + _semaphoreSlim.Release(); + } + } + /// /// It is not possible to close store the offset here since the consumer is not aware of the stream /// you need to use the consumer inside the MessageHandler to store the offset @@ -248,6 +225,7 @@ public RawSuperStreamConsumerConfig(string superStream) /// public Func MessageHandler { get; set; } + public Func ConnectionClosedHandler { get; set; } public string SuperStream { get; } internal Client Client { get; set; } diff --git a/RabbitMQ.Stream.Client/Reliable/Consumer.cs b/RabbitMQ.Stream.Client/Reliable/Consumer.cs index 6d3dd058..af118fea 100644 --- a/RabbitMQ.Stream.Client/Reliable/Consumer.cs +++ b/RabbitMQ.Stream.Client/Reliable/Consumer.cs @@ -157,7 +157,6 @@ public ConsumerConfig(StreamSystem streamSystem, string stream) : base(streamSys /// public class Consumer : ConsumerFactory { - private readonly ILogger _logger; protected override ILogger BaseLogger => _logger; @@ -174,13 +173,14 @@ public static async Task Create(ConsumerConfig consumerConfig, ILogger consumerConfig.ReconnectStrategy ??= new BackOffReconnectStrategy(logger); consumerConfig.ResourceAvailableReconnectStrategy ??= new ResourceAvailableBackOffReconnectStrategy(logger); var rConsumer = new Consumer(consumerConfig, logger); - await rConsumer.Init(consumerConfig.ReconnectStrategy, consumerConfig.ResourceAvailableReconnectStrategy).ConfigureAwait(false); + await rConsumer.Init(consumerConfig.ReconnectStrategy, consumerConfig.ResourceAvailableReconnectStrategy) + .ConfigureAwait(false); logger?.LogDebug("Consumer: {Reference} created for Stream: {Stream}", consumerConfig.Reference, consumerConfig.Stream); return rConsumer; } - internal override async Task CreateNewEntity(bool boot) + protected override async Task CreateNewEntity(bool boot) { _consumer = await CreateConsumer(boot).ConfigureAwait(false); await _consumerConfig.ReconnectStrategy.WhenConnected(ToString()).ConfigureAwait(false); diff --git a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs index a3f41fef..f13e71d7 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs @@ -2,6 +2,7 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. +using System; using System.Collections.Concurrent; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -87,9 +88,9 @@ private async Task SuperConsumer(bool boot) // it can restart consuming from the last consumer offset + 1 (+1 since we need to consume from the next) if (!boot && _consumedFirstTime) { - foreach (var (stream, offset) in _lastOffsetConsumed) + foreach (var (streamOff, offset) in _lastOffsetConsumed) { - offsetSpecs[stream] = new OffsetTypeOffset(offset + 1); + offsetSpecs[streamOff] = new OffsetTypeOffset(offset + 1); } } else @@ -103,27 +104,79 @@ private async Task SuperConsumer(bool boot) } } - return await _consumerConfig.StreamSystem.CreateSuperStreamConsumer( - new RawSuperStreamConsumerConfig(_consumerConfig.Stream) - { - ClientProvidedName = _consumerConfig.ClientProvidedName, - Reference = _consumerConfig.Reference, - ConsumerUpdateListener = _consumerConfig.ConsumerUpdateListener, - IsSingleActiveConsumer = _consumerConfig.IsSingleActiveConsumer, - InitialCredits = _consumerConfig.InitialCredits, - ConsumerFilter = _consumerConfig.Filter, - Crc32 = _consumerConfig.Crc32, - OffsetSpec = offsetSpecs, - MessageHandler = async (stream, consumer, ctx, message) => + if (boot) + { + return await _consumerConfig.StreamSystem.CreateSuperStreamConsumer( + new RawSuperStreamConsumerConfig(_consumerConfig.Stream) { - _consumedFirstTime = true; - _lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset; - if (_consumerConfig.MessageHandler != null) + ClientProvidedName = _consumerConfig.ClientProvidedName, + Reference = _consumerConfig.Reference, + ConsumerUpdateListener = _consumerConfig.ConsumerUpdateListener, + IsSingleActiveConsumer = _consumerConfig.IsSingleActiveConsumer, + InitialCredits = _consumerConfig.InitialCredits, + ConsumerFilter = _consumerConfig.Filter, + Crc32 = _consumerConfig.Crc32, + OffsetSpec = offsetSpecs, + ConnectionClosedHandler = async (closeReason, partitionStream) => + { + if (closeReason == ConnectionClosedReason.Normal) + { + BaseLogger.LogInformation("{Identity} is closed normally", ToString()); + return; + } + + await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, + _ => MaybeReconnectPartition(partitionStream)) + .ConfigureAwait(false); + }, + MetadataHandler = async update => + { + await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, + _ => MaybeReconnectPartition(update.Stream)) + .ConfigureAwait(false); + }, + MessageHandler = async (partitionStream, consumer, ctx, message) => { - await _consumerConfig.MessageHandler(stream, consumer, ctx, - message).ConfigureAwait(false); - } - }, - }, BaseLogger).ConfigureAwait(false); + _consumedFirstTime = true; + _lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset; + if (_consumerConfig.MessageHandler != null) + { + await _consumerConfig.MessageHandler(partitionStream, consumer, ctx, + message).ConfigureAwait(false); + } + }, + }, BaseLogger).ConfigureAwait(false); + } + + return _consumer; + + async Task MaybeReconnectPartition(string stream) + { + var reconnect = await _reconnectStrategy + .WhenDisconnected($"Super Stream partition: {stream} for {_consumer.Info}").ConfigureAwait(false); + + if (!reconnect) + { + UpdateStatus(ReliableEntityStatus.Closed); + return; + } + + try + { + UpdateStatus(ReliableEntityStatus.Reconnecting); + await ((RawSuperStreamConsumer)_consumer)!.ReconnectPartition(stream).ConfigureAwait(false); + UpdateStatus(ReliableEntityStatus.Open); + await _reconnectStrategy.WhenConnected( + $"Super Stream partition: {stream} for {_consumer.Info}").ConfigureAwait(false); + } + catch (Exception e) + { + LogException(e); + if (ClientExceptions.IsAKnownException(e)) + { + await MaybeReconnectPartition(stream).ConfigureAwait(false); + } + } + } } } diff --git a/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs b/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs index 57118097..932cae4a 100644 --- a/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs +++ b/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs @@ -105,13 +105,13 @@ public async ValueTask WhenDisconnected(string resourceIdentifier) ); await Task.Delay(TimeSpan.FromSeconds(Tentatives)).ConfigureAwait(false); MaybeResetTentatives(); - return Tentatives < 4; + return Tentatives < 5; } public ValueTask WhenConnected(string resourceIdentifier) { Tentatives = 1; - _logger.LogInformation("{ResourceIdentifier} is available", resourceIdentifier); + _logger.LogInformation("{ResourceIdentifier}", resourceIdentifier); return ValueTask.CompletedTask; } } diff --git a/RabbitMQ.Stream.Client/Reliable/Producer.cs b/RabbitMQ.Stream.Client/Reliable/Producer.cs index e62cd368..744e132c 100644 --- a/RabbitMQ.Stream.Client/Reliable/Producer.cs +++ b/RabbitMQ.Stream.Client/Reliable/Producer.cs @@ -172,7 +172,7 @@ await rProducer.Init(producerConfig.ReconnectStrategy, producerConfig.ResourceAv return rProducer; } - internal override async Task CreateNewEntity(bool boot) + protected override async Task CreateNewEntity(bool boot) { _producer = await CreateProducer().ConfigureAwait(false); diff --git a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs index 921f1a8c..8b977737 100644 --- a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs +++ b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs @@ -76,7 +76,7 @@ protected bool IsValidStatus() } protected abstract ILogger BaseLogger { get; } - private IReconnectStrategy _reconnectStrategy; + protected IReconnectStrategy _reconnectStrategy; private IReconnectStrategy _resourceAvailableReconnectStrategy; internal async Task Init(IReconnectStrategy reconnectStrategy, @@ -159,11 +159,10 @@ private async Task Init(bool boot) /// If it is the First boot for the reliable P/C /// Called by Init method /// - internal abstract Task CreateNewEntity(bool boot); + protected abstract Task CreateNewEntity(bool boot); - protected async Task CheckIfStreamIsAvailable(string stream, StreamSystem system) + private async Task CheckIfStreamIsAvailable(string stream, StreamSystem system) { - await Task.Delay(Consts.RandomMid()).ConfigureAwait(false); var exists = false; var tryAgain = true; @@ -172,7 +171,9 @@ protected async Task CheckIfStreamIsAvailable(string stream, StreamSystem try { exists = await system.StreamExists(stream).ConfigureAwait(false); - await _resourceAvailableReconnectStrategy.WhenConnected(stream).ConfigureAwait(false); + var available = exists ? "available" : "not available"; + await _resourceAvailableReconnectStrategy.WhenConnected($"{stream} is {available}") + .ConfigureAwait(false); break; } catch (Exception e) @@ -199,8 +200,8 @@ protected async Task CheckIfStreamIsAvailable(string stream, StreamSystem // /// Try to reconnect to the broker /// Based on the retry strategy -// - protected async Task MaybeReconnect() + // + private async Task MaybeReconnect() { var reconnect = await _reconnectStrategy.WhenDisconnected(ToString()).ConfigureAwait(false); if (!reconnect) @@ -212,7 +213,8 @@ protected async Task MaybeReconnect() switch (IsOpen()) { case true: - await TryToReconnect().ConfigureAwait(false); + UpdateStatus(ReliableEntityStatus.Reconnecting); + await MaybeInit(false).ConfigureAwait(false); break; case false: if (CompareStatus(ReliableEntityStatus.Reconnecting)) @@ -224,15 +226,6 @@ protected async Task MaybeReconnect() } } - /// - /// Try to reconnect to the broker - /// - private async Task TryToReconnect() - { - UpdateStatus(ReliableEntityStatus.Reconnecting); - await MaybeInit(false).ConfigureAwait(false); - } - /// /// When the clients receives a meta data update, it doesn't know /// the reason. @@ -244,7 +237,7 @@ private async Task TryToReconnect() /// and try to reconnect. /// (internal because it is needed for tests) /// - private void LogException(Exception exception) + protected void LogException(Exception exception) { const string KnownExceptionTemplate = "{Identity} trying to reconnect due to exception {Err}"; const string UnknownExceptionTemplate = "{Identity} received an exception during initialization"; @@ -267,6 +260,30 @@ private void LogException(Exception exception) /// protected abstract Task CloseEntity(); + internal async Task OnEntityClosed(StreamSystem system, string stream, Func reconnectFunc) + { + var streamExists = false; + await SemaphoreSlim.WaitAsync().ConfigureAwait(false); + try + { + streamExists = await CheckIfStreamIsAvailable(stream, system) + .ConfigureAwait(false); + if (streamExists) + { + await reconnectFunc(stream).ConfigureAwait(false); + } + } + finally + { + SemaphoreSlim.Release(); + } + + if (!streamExists) + { + await Close().ConfigureAwait(false); + } + } + internal async Task OnEntityClosed(StreamSystem system, string stream) { var streamExists = false; diff --git a/RabbitMQ.Stream.Client/StreamSystem.cs b/RabbitMQ.Stream.Client/StreamSystem.cs index 17b038a8..88a501d5 100644 --- a/RabbitMQ.Stream.Client/StreamSystem.cs +++ b/RabbitMQ.Stream.Client/StreamSystem.cs @@ -350,7 +350,15 @@ public async Task CreateStream(StreamSpec spec) public async Task StreamExists(string stream) { await MayBeReconnectLocator().ConfigureAwait(false); - return await _client.StreamExists(stream).ConfigureAwait(false); + await _semClientProvidedName.WaitAsync().ConfigureAwait(false); + try + { + return await _client.StreamExists(stream).ConfigureAwait(false); + } + finally + { + _semClientProvidedName.Release(); + } } private static void MaybeThrowQueryException(string reference, string stream) diff --git a/Tests/RawConsumerSystemTests.cs b/Tests/RawConsumerSystemTests.cs index 52cd20c4..c92242e3 100644 --- a/Tests/RawConsumerSystemTests.cs +++ b/Tests/RawConsumerSystemTests.cs @@ -621,7 +621,7 @@ public async void ConsumerMetadataHandlerUpdate() SystemUtils.Wait(); await system.DeleteStream(stream); new Utils(testOutputHelper).WaitUntilTaskCompletes(testPassed); - Assert.False(((RawConsumer)rawConsumer).IsOpen()); + SystemUtils.WaitUntil(() => ((RawConsumer)rawConsumer).IsOpen() == false); await rawConsumer.Close(); await system.Close(); } diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs index c1896063..d7fa108e 100644 --- a/Tests/ReliableTests.cs +++ b/Tests/ReliableTests.cs @@ -505,7 +505,7 @@ internal FakeThrowExceptionConsumer(ConsumerConfig consumerConfig, Exception exc _exceptionType = exceptionType; } - internal override Task CreateNewEntity(bool boot) + protected override Task CreateNewEntity(bool boot) { if (!_firstTime) { From fa9bfba37c068de3110f9118acf8f1acb1d88b46 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Sun, 14 Jan 2024 21:12:44 +0100 Subject: [PATCH 2/9] Add events to the super stream raw consumer Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/ConnectionsPool.cs | 10 +++ RabbitMQ.Stream.Client/PublicAPI.Shipped.txt | 1 - .../PublicAPI.Unshipped.txt | 6 ++ RabbitMQ.Stream.Client/RawConsumer.cs | 3 +- RabbitMQ.Stream.Client/RawProducer.cs | 6 +- .../RawSuperStreamConsumer.cs | 8 +- .../RawSuperStreamProducer.cs | 86 +++++++++++-------- RabbitMQ.Stream.Client/Reliable/Consumer.cs | 2 +- .../Reliable/ConsumerFactory.cs | 40 ++------- .../Reliable/IReconnectStrategy.cs | 8 +- RabbitMQ.Stream.Client/Reliable/Producer.cs | 8 +- .../Reliable/ProducerFactory.cs | 83 ++++++++++++------ .../Reliable/ReliableBase.cs | 35 +++++++- RabbitMQ.Stream.Client/RoutingClient.cs | 1 - 14 files changed, 179 insertions(+), 118 deletions(-) diff --git a/RabbitMQ.Stream.Client/ConnectionsPool.cs b/RabbitMQ.Stream.Client/ConnectionsPool.cs index 81d8c633..8e0c3a87 100644 --- a/RabbitMQ.Stream.Client/ConnectionsPool.cs +++ b/RabbitMQ.Stream.Client/ConnectionsPool.cs @@ -154,6 +154,16 @@ internal async Task GetOrCreateClient(string brokerInfo, Func x.BrokerInfo == brokerInfo && x.Available); connectionItem.LastUsed = DateTime.UtcNow; + if ((Client)(connectionItem.Client) is not {IsClosed: true}) return connectionItem.Client; + + // the connection is closed + // let's remove it from the pool + Connections.TryRemove(connectionItem.Client.ClientId, out _); + // let's create a new one + connectionItem = new ConnectionItem(brokerInfo, _idsPerConnection, await createClient().ConfigureAwait(false)); + Connections.TryAdd(connectionItem.Client.ClientId, connectionItem); + + return connectionItem.Client; } diff --git a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt index 6ffcb1f3..e45c7a6c 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt @@ -634,7 +634,6 @@ RabbitMQ.Stream.Client.Reliable.ProducerConfig.SuperStreamConfig.set -> void RabbitMQ.Stream.Client.Reliable.ProducerConfig.TimeoutMessageAfter.get -> System.TimeSpan RabbitMQ.Stream.Client.Reliable.ProducerConfig.TimeoutMessageAfter.init -> void RabbitMQ.Stream.Client.Reliable.ProducerFactory -RabbitMQ.Stream.Client.Reliable.ProducerFactory.CreateProducer() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Reliable.ProducerFactory.ProducerFactory() -> void RabbitMQ.Stream.Client.Reliable.ProducerFactory._confirmationPipe -> RabbitMQ.Stream.Client.Reliable.ConfirmationPipe RabbitMQ.Stream.Client.Reliable.ProducerFactory._producerConfig -> RabbitMQ.Stream.Client.Reliable.ProducerConfig diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index abc8ca62..36651f15 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -170,6 +170,9 @@ RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig.ConnectionClosedHandler.get RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig.ConnectionClosedHandler.set -> void RabbitMQ.Stream.Client.RawSuperStreamProducer.Close() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.RawSuperStreamProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo +RabbitMQ.Stream.Client.RawSuperStreamProducer.ReconnectPartition(string stream) -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.ConnectionClosedHandler.get -> System.Func +RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.ConnectionClosedHandler.set -> void RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void RabbitMQ.Stream.Client.Reconnect.IReconnectStrategy @@ -195,9 +198,12 @@ RabbitMQ.Stream.Client.Reliable.Producer.Info.get -> RabbitMQ.Stream.Client.Prod RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void +RabbitMQ.Stream.Client.Reliable.ProducerFactory.CreateProducer(bool boot) -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.Reliable.ProducerFactory._producer -> RabbitMQ.Stream.Client.IProducer RabbitMQ.Stream.Client.Reliable.ReliableBase.CompareStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus toTest) -> bool RabbitMQ.Stream.Client.Reliable.ReliableBase.IsValidStatus() -> bool RabbitMQ.Stream.Client.Reliable.ReliableBase.LogException(System.Exception exception) -> void +RabbitMQ.Stream.Client.Reliable.ReliableBase.MaybeReconnectPartition(string stream, string info, System.Func reconnectPartitionFunc) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Reliable.ReliableBase.UpdateStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus status) -> void RabbitMQ.Stream.Client.Reliable.ReliableBase._reconnectStrategy -> RabbitMQ.Stream.Client.Reconnect.IReconnectStrategy RabbitMQ.Stream.Client.Reliable.ReliableBase._status -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index c10d6009..807822a4 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -621,8 +621,7 @@ private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() => // we call the Close to re-enter to the standard behavior // ignoreIfClosed is an optimization to avoid to send the unsubscribe _config.Pool.RemoveConsumerEntityFromStream(_client.ClientId, EntityId, _config.Stream); - await Close().ConfigureAwait(false); - + await Shutdown(_config, true).ConfigureAwait(false); _config.MetadataHandler?.Invoke(metaDataUpdate); }; diff --git a/RabbitMQ.Stream.Client/RawProducer.cs b/RabbitMQ.Stream.Client/RawProducer.cs index 90a6ca83..754dda73 100644 --- a/RabbitMQ.Stream.Client/RawProducer.cs +++ b/RabbitMQ.Stream.Client/RawProducer.cs @@ -164,6 +164,8 @@ private async Task Init() private Client.ConnectionCloseHandler OnConnectionClosed() => async (reason) => { + _client.ConnectionClosed -= OnConnectionClosed(); + _client.Parameters.OnMetadataUpdate -= OnMetadataUpdate(); _config.Pool.Remove(_client.ClientId); await Shutdown(_config, true).ConfigureAwait(false); if (_config.ConnectionClosedHandler != null) @@ -171,8 +173,6 @@ private Client.ConnectionCloseHandler OnConnectionClosed() => await _config.ConnectionClosedHandler(reason).ConfigureAwait(false); } - // remove the event since the connection is closed - _client.ConnectionClosed -= OnConnectionClosed(); }; private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() => @@ -193,8 +193,8 @@ private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() => // and the DeletePublisher producer is not needed anymore (ignoreIfClosed = true) // we call the Close to re-enter to the standard behavior // ignoreIfClosed is an optimization to avoid to send the DeletePublisher - _config.MetadataHandler?.Invoke(metaDataUpdate); await Shutdown(_config, true).ConfigureAwait(false); + _config.MetadataHandler?.Invoke(metaDataUpdate); }; private bool IsFilteringEnabled => _config.Filter is { FilterValue: not null }; diff --git a/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs b/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs index ea2626dd..a372ab78 100644 --- a/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs +++ b/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs @@ -81,6 +81,7 @@ private RawConsumerConfig FromStreamConfig(string stream) ConnectionClosedHandler = async (reason) => { _consumers.TryRemove(stream, out var consumer); + consumer?.Close(); if (consumer != null) { _logger.LogInformation( @@ -94,7 +95,6 @@ private RawConsumerConfig FromStreamConfig(string stream) await _config.ConnectionClosedHandler(reason, stream).ConfigureAwait(false); } - // await GetConsumer(stream).ConfigureAwait(false); } }, MessageHandler = async (consumer, context, message) => @@ -131,7 +131,7 @@ private async Task InitConsumer(string stream) return c; } - private async Task GetConsumer(string stream) + private async Task MaybeAddConsumer(string stream) { if (!_consumers.ContainsKey(stream)) { @@ -144,7 +144,7 @@ private async Task StartConsumers() { foreach (var stream in _streamInfos.Keys) { - await GetConsumer(stream).ConfigureAwait(false); + await MaybeAddConsumer(stream).ConfigureAwait(false); } } @@ -155,7 +155,7 @@ public async Task ReconnectPartition(string stream) { _consumers.TryRemove(stream, out var consumer); consumer?.Close(); - await GetConsumer(stream).ConfigureAwait(false); + await MaybeAddConsumer(stream).ConfigureAwait(false); } finally { diff --git a/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs b/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs index 23cc2fb1..3f546ac4 100644 --- a/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs +++ b/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs @@ -42,6 +42,7 @@ public class RawSuperStreamProducer : IProducer, IDisposable private readonly IDictionary _streamInfos; private readonly ClientParameters _clientParameters; private readonly ILogger _logger; + private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1); public static IProducer Create( RawSuperStreamProducerConfig rawSuperStreamProducerConfig, @@ -89,37 +90,23 @@ private RawProducerConfig FromStreamConfig(string stream) MaxInFlight = _config.MaxInFlight, Filter = _config.Filter, Pool = _config.Pool, - ConnectionClosedHandler = (s) => + ConnectionClosedHandler = async (reason) => { - // In case of connection closed, we need to remove the producer from the list - // We hide the behavior of the producer to the user - // if needed the connection will be created again - _producers.TryRemove(stream, out _); - return Task.CompletedTask; + _producers.TryRemove(stream, out var producer); + producer?.Close(); + if (_config.ConnectionClosedHandler != null) + { + await _config.ConnectionClosedHandler(reason, stream).ConfigureAwait(false); + } }, - MetadataHandler = update => + MetadataHandler = async update => { - // In case of stream update we remove the producer from the list - // We hide the behavior of the producer to the user - // if needed the connection will be created again - // we "should" always have the stream - - // but we have to handle the case¬ - // We need to wait a bit it can take some time to update the configuration - Thread.Sleep(500); - - var exists = _config.Client.StreamExists(update.Stream); - if (!exists.Result) + _producers.TryRemove(update.Stream, out var producer); + producer?.Close(); + if (_config.MetadataHandler != null) { - // The stream doesn't exist anymore - // but this condition should be avoided since the hash routing - // can be compromised - _logger.LogWarning("Stream {StreamIdentifier} is not available anymore", update.Stream); - _streamInfos.Remove(update.Stream); + await _config.MetadataHandler(update).ConfigureAwait(false); } - - _producers.TryRemove(update.Stream, out var producer); - return Task.CompletedTask; }, ClientProvidedName = _config.ClientProvidedName, BatchSize = _config.BatchSize, @@ -130,7 +117,7 @@ private RawProducerConfig FromStreamConfig(string stream) // The producer is created on demand when a message is sent to a stream private async Task InitProducer(string stream) { - var p = await RawProducer.Create(_clientParameters with { ClientProvidedName = _config.ClientProvidedName }, + var p = await RawProducer.Create(_clientParameters with {ClientProvidedName = _config.ClientProvidedName}, FromStreamConfig(stream), _streamInfos[stream], _logger) @@ -148,7 +135,7 @@ protected void ThrowIfClosed() } } - private async Task GetProducer(string stream) + private async Task MaybeAddAndGetProducer(string stream) { if (!_producers.ContainsKey(stream)) { @@ -159,6 +146,22 @@ private async Task GetProducer(string stream) return _producers[stream]; } + + public async Task ReconnectPartition(string stream) + { + await _semaphoreSlim.WaitAsync().ConfigureAwait(false); + try + { + _producers.TryRemove(stream, out var producer); + producer?.Close(); + await MaybeAddAndGetProducer(stream).ConfigureAwait(false); + } + finally + { + _semaphoreSlim.Release(); + } + } + // based on the stream name and the partition key, we select the producer private async Task GetProducerForMessage(Message message) { @@ -172,12 +175,20 @@ private async Task GetProducerForMessage(Message message) // we should always have a route // but in case of stream KEY the routing could not exist - if (routes is not { Count: > 0 }) + if (routes is not {Count: > 0}) { throw new RouteNotFoundException("No route found for the message to any stream"); } - return await GetProducer(routes[0]).ConfigureAwait(false); + await _semaphoreSlim.WaitAsync().ConfigureAwait(false); + try + { + return await MaybeAddAndGetProducer(routes[0]).ConfigureAwait(false); + } + finally + { + _semaphoreSlim.Release(); + } } public async ValueTask Send(ulong publishingId, Message message) @@ -205,7 +216,7 @@ public async ValueTask Send(List<(ulong, Message)> messages) } else { - aggregate.Add((p, new List<(ulong, Message)>() { (subMessage.Item1, subMessage.Item2) })); + aggregate.Add((p, new List<(ulong, Message)>() {(subMessage.Item1, subMessage.Item2)})); } } @@ -232,7 +243,7 @@ public async ValueTask Send(ulong publishingId, List subEntryMessages, } else { - aggregate.Add((p, new List() { subMessage })); + aggregate.Add((p, new List() {subMessage})); } } @@ -263,7 +274,7 @@ public Task GetLastPublishingId() { foreach (var stream in _streamInfos.Keys.ToList()) { - GetProducer(stream).Wait(); + MaybeAddAndGetProducer(stream).Wait(); } var v = _producers.Values.Min(p => p.GetLastPublishingId().Result); @@ -325,6 +336,9 @@ public RawSuperStreamProducerConfig(string superStream) public RoutingStrategyType RoutingStrategyType { get; set; } = RoutingStrategyType.Hash; + public Func ConnectionClosedHandler { get; set; } + + internal Client Client { get; set; } } @@ -367,7 +381,7 @@ public Task> Route(Message message, List partitions) var key = _routingKeyExtractor(message); var hash = new Murmur32ManagedX86(Seed).ComputeHash(Encoding.UTF8.GetBytes(key)); var index = BitConverter.ToUInt32(hash, 0) % (uint)partitions.Count; - var r = new List() { partitions[(int)index] }; + var r = new List() {partitions[(int)index]}; return Task.FromResult(r); } @@ -400,8 +414,8 @@ public async Task> Route(Message message, List partitions) var c = await _routingKeyQFunc(_superStream, key).ConfigureAwait(false); _cacheStream[key] = c.Streams; return (from resultStream in c.Streams - where partitions.Contains(resultStream) - select new List() { resultStream }).FirstOrDefault(); + where partitions.Contains(resultStream) + select new List() {resultStream}).FirstOrDefault(); } public KeyRoutingStrategy(Func routingKeyExtractor, diff --git a/RabbitMQ.Stream.Client/Reliable/Consumer.cs b/RabbitMQ.Stream.Client/Reliable/Consumer.cs index af118fea..c63dcee8 100644 --- a/RabbitMQ.Stream.Client/Reliable/Consumer.cs +++ b/RabbitMQ.Stream.Client/Reliable/Consumer.cs @@ -189,7 +189,7 @@ protected override async Task CreateNewEntity(bool boot) // just close the consumer. See base/metadataupdate protected override async Task CloseEntity() { - await SemaphoreSlim.WaitAsync(Consts.LongWait).ConfigureAwait(false); + await SemaphoreSlim.WaitAsync().ConfigureAwait(false); try { if (_consumer != null) diff --git a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs index f13e71d7..c1f74239 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs @@ -124,15 +124,16 @@ private async Task SuperConsumer(bool boot) BaseLogger.LogInformation("{Identity} is closed normally", ToString()); return; } - - await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, - _ => MaybeReconnectPartition(partitionStream)) + + var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition; + await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r) .ConfigureAwait(false); + }, MetadataHandler = async update => { - await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, - _ => MaybeReconnectPartition(update.Stream)) + var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition; + await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r) .ConfigureAwait(false); }, MessageHandler = async (partitionStream, consumer, ctx, message) => @@ -150,33 +151,6 @@ await _consumerConfig.MessageHandler(partitionStream, consumer, ctx, return _consumer; - async Task MaybeReconnectPartition(string stream) - { - var reconnect = await _reconnectStrategy - .WhenDisconnected($"Super Stream partition: {stream} for {_consumer.Info}").ConfigureAwait(false); - - if (!reconnect) - { - UpdateStatus(ReliableEntityStatus.Closed); - return; - } - - try - { - UpdateStatus(ReliableEntityStatus.Reconnecting); - await ((RawSuperStreamConsumer)_consumer)!.ReconnectPartition(stream).ConfigureAwait(false); - UpdateStatus(ReliableEntityStatus.Open); - await _reconnectStrategy.WhenConnected( - $"Super Stream partition: {stream} for {_consumer.Info}").ConfigureAwait(false); - } - catch (Exception e) - { - LogException(e); - if (ClientExceptions.IsAKnownException(e)) - { - await MaybeReconnectPartition(stream).ConfigureAwait(false); - } - } - } + } } diff --git a/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs b/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs index 932cae4a..90af1a1e 100644 --- a/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs +++ b/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs @@ -48,7 +48,7 @@ public BackOffReconnectStrategy(ILogger logger = null) // else the backoff will be too long private void MaybeResetTentatives() { - if (Tentatives > 1000) + if (Tentatives > 5) { Tentatives = 1; } @@ -57,12 +57,14 @@ private void MaybeResetTentatives() public async ValueTask WhenDisconnected(string connectionIdentifier) { Tentatives <<= 1; + var next = Random.Shared.Next(Tentatives * 1000, Tentatives * 2000); _logger.LogInformation( "{ConnectionIdentifier} disconnected, check if reconnection needed in {ReconnectionDelayMs} ms", connectionIdentifier, - Tentatives * 100 + next ); - await Task.Delay(TimeSpan.FromMilliseconds(Tentatives * 100)).ConfigureAwait(false); + + await Task.Delay(TimeSpan.FromMilliseconds(next)).ConfigureAwait(false); MaybeResetTentatives(); return true; } diff --git a/RabbitMQ.Stream.Client/Reliable/Producer.cs b/RabbitMQ.Stream.Client/Reliable/Producer.cs index 744e132c..26a799a5 100644 --- a/RabbitMQ.Stream.Client/Reliable/Producer.cs +++ b/RabbitMQ.Stream.Client/Reliable/Producer.cs @@ -125,7 +125,7 @@ public ProducerConfig(StreamSystem streamSystem, string stream) : base(streamSys /// public class Producer : ProducerFactory { - private IProducer _producer; + private ulong _publishingId; private readonly ILogger _logger; @@ -174,7 +174,7 @@ await rProducer.Init(producerConfig.ReconnectStrategy, producerConfig.ResourceAv protected override async Task CreateNewEntity(bool boot) { - _producer = await CreateProducer().ConfigureAwait(false); + _producer = await CreateProducer(boot).ConfigureAwait(false); await _producerConfig.ReconnectStrategy.WhenConnected(ToString()).ConfigureAwait(false); @@ -191,7 +191,7 @@ protected override async Task CreateNewEntity(bool boot) protected override async Task CloseEntity() { - await SemaphoreSlim.WaitAsync(Consts.LongWait).ConfigureAwait(false); + await SemaphoreSlim.WaitAsync().ConfigureAwait(false); try { if (_producer != null) @@ -214,7 +214,7 @@ public override async Task Close() } UpdateStatus(ReliableEntityStatus.Closed); - await SemaphoreSlim.WaitAsync(Consts.ShortWait).ConfigureAwait(false); + await SemaphoreSlim.WaitAsync().ConfigureAwait(false); try { _confirmationPipe.Stop(); diff --git a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs index 3964c78a..24acb685 100644 --- a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs @@ -15,48 +15,75 @@ namespace RabbitMQ.Stream.Client.Reliable; public abstract class ProducerFactory : ReliableBase { + protected IProducer _producer; protected ProducerConfig _producerConfig; protected ConfirmationPipe _confirmationPipe; - protected async Task CreateProducer() + protected async Task CreateProducer(bool boot) { if (_producerConfig.SuperStreamConfig is { Enabled: true }) { - return await SuperStreamProducer().ConfigureAwait(false); + return await SuperStreamProducer(boot).ConfigureAwait(false); } return await StandardProducer().ConfigureAwait(false); } - private async Task SuperStreamProducer() + private async Task SuperStreamProducer(bool boot) { - return await _producerConfig.StreamSystem.CreateRawSuperStreamProducer( - new RawSuperStreamProducerConfig(_producerConfig.Stream) - { - ClientProvidedName = _producerConfig.ClientProvidedName, - Reference = _producerConfig.Reference, - MessagesBufferSize = _producerConfig.MessagesBufferSize, - MaxInFlight = _producerConfig.MaxInFlight, - Routing = _producerConfig.SuperStreamConfig.Routing, - RoutingStrategyType = _producerConfig.SuperStreamConfig.RoutingStrategyType, - Filter = _producerConfig.Filter, - ConfirmHandler = confirmationHandler => + if (boot) + { + + + + return await _producerConfig.StreamSystem.CreateRawSuperStreamProducer( + new RawSuperStreamProducerConfig(_producerConfig.Stream) { - var (stream, confirmation) = confirmationHandler; - var confirmationStatus = confirmation.Code switch + ClientProvidedName = _producerConfig.ClientProvidedName, + Reference = _producerConfig.Reference, + MessagesBufferSize = _producerConfig.MessagesBufferSize, + MaxInFlight = _producerConfig.MaxInFlight, + Routing = _producerConfig.SuperStreamConfig.Routing, + RoutingStrategyType = _producerConfig.SuperStreamConfig.RoutingStrategyType, + Filter = _producerConfig.Filter, + ConnectionClosedHandler = async (closeReason, partitionStream) => { - ResponseCode.PublisherDoesNotExist => ConfirmationStatus.PublisherDoesNotExist, - ResponseCode.AccessRefused => ConfirmationStatus.AccessRefused, - ResponseCode.InternalError => ConfirmationStatus.InternalError, - ResponseCode.PreconditionFailed => ConfirmationStatus.PreconditionFailed, - ResponseCode.StreamNotAvailable => ConfirmationStatus.StreamNotAvailable, - ResponseCode.Ok => ConfirmationStatus.Confirmed, - _ => ConfirmationStatus.UndefinedError - }; - _confirmationPipe.RemoveUnConfirmedMessage(confirmationStatus, confirmation.PublishingId, - stream).ConfigureAwait(false); - } - }, BaseLogger).ConfigureAwait(false); + if (closeReason == ConnectionClosedReason.Normal) + { + BaseLogger.LogInformation("{Identity} is closed normally", ToString()); + return; + } + + var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition; + await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r) + .ConfigureAwait(false); + + }, + MetadataHandler = async update => + { + var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition; + await OnEntityClosed(_producerConfig.StreamSystem, update.Stream, r) + .ConfigureAwait(false); + }, + ConfirmHandler = confirmationHandler => + { + var (stream, confirmation) = confirmationHandler; + var confirmationStatus = confirmation.Code switch + { + ResponseCode.PublisherDoesNotExist => ConfirmationStatus.PublisherDoesNotExist, + ResponseCode.AccessRefused => ConfirmationStatus.AccessRefused, + ResponseCode.InternalError => ConfirmationStatus.InternalError, + ResponseCode.PreconditionFailed => ConfirmationStatus.PreconditionFailed, + ResponseCode.StreamNotAvailable => ConfirmationStatus.StreamNotAvailable, + ResponseCode.Ok => ConfirmationStatus.Confirmed, + _ => ConfirmationStatus.UndefinedError + }; + _confirmationPipe.RemoveUnConfirmedMessage(confirmationStatus, confirmation.PublishingId, + stream).ConfigureAwait(false); + } + }, BaseLogger).ConfigureAwait(false); + } + return _producer; } private async Task StandardProducer() diff --git a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs index 8b977737..e6782561 100644 --- a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs +++ b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs @@ -225,6 +225,36 @@ private async Task MaybeReconnect() break; } } + + + protected async Task MaybeReconnectPartition(string stream, string info, Func reconnectPartitionFunc) + { + var reconnect = await _reconnectStrategy + .WhenDisconnected($"Super Stream partition: {stream} for {info}").ConfigureAwait(false); + + if (!reconnect) + { + UpdateStatus(ReliableEntityStatus.Closed); + return; + } + + try + { + UpdateStatus(ReliableEntityStatus.Reconnecting); + await reconnectPartitionFunc(stream).ConfigureAwait(false); + UpdateStatus(ReliableEntityStatus.Open); + await _reconnectStrategy.WhenConnected( + $"Super Stream partition: {stream} for {info}").ConfigureAwait(false); + } + catch (Exception e) + { + LogException(e); + if (ClientExceptions.IsAKnownException(e)) + { + await MaybeReconnectPartition(stream, info, reconnectPartitionFunc).ConfigureAwait(false); + } + } + } /// /// When the clients receives a meta data update, it doesn't know @@ -260,7 +290,7 @@ protected void LogException(Exception exception) /// protected abstract Task CloseEntity(); - internal async Task OnEntityClosed(StreamSystem system, string stream, Func reconnectFunc) + internal async Task OnEntityClosed(StreamSystem system, string stream, Func reconnectPartitionFunc) { var streamExists = false; await SemaphoreSlim.WaitAsync().ConfigureAwait(false); @@ -270,7 +300,8 @@ internal async Task OnEntityClosed(StreamSystem system, string stream, Func LookupRandomConnection(ClientParameters client { var brokers = new List() { metaDataInfo.Leader }; brokers.AddRange(metaDataInfo.Replicas); - // brokers.Sort((_, _) => Random.Shared.Next(-1, 1)); var br = brokers.OrderBy(x => Random.Shared.Next()).ToList(); var exceptions = new List(); foreach (var broker in br) From d5e31302eadd666b89917bf56c000d367ce722bd Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 16 Jan 2024 21:43:09 +0100 Subject: [PATCH 3/9] Reconnect the super stream partion * Reconnect the super stream partition with the stream info the stream could change the topology. The new stream info need to be passed to the producer or consumer * Expose stream_info it gives info about the stream location like: leader and followers Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/ClientExceptions.cs | 9 +- RabbitMQ.Stream.Client/ConnectionsPool.cs | 8 +- RabbitMQ.Stream.Client/IClient.cs | 2 + .../PublicAPI.Unshipped.txt | 10 +- RabbitMQ.Stream.Client/RawConsumer.cs | 4 +- RabbitMQ.Stream.Client/RawProducer.cs | 13 +-- .../RawSuperStreamConsumer.cs | 39 ++++--- .../RawSuperStreamProducer.cs | 64 +++++++---- .../Reliable/ConfirmationPipe.cs | 2 +- RabbitMQ.Stream.Client/Reliable/Consumer.cs | 2 - .../Reliable/ConsumerFactory.cs | 10 +- .../Reliable/IReconnectStrategy.cs | 19 +++- RabbitMQ.Stream.Client/Reliable/Producer.cs | 6 - .../Reliable/ProducerFactory.cs | 11 +- .../Reliable/ReliableBase.cs | 104 +++++++++--------- RabbitMQ.Stream.Client/RoutingClient.cs | 6 +- RabbitMQ.Stream.Client/StreamSystem.cs | 4 +- Tests/SuperStreamProducerTests.cs | 86 ++++++++++----- Tests/UnitTests.cs | 2 + 19 files changed, 240 insertions(+), 161 deletions(-) diff --git a/RabbitMQ.Stream.Client/ClientExceptions.cs b/RabbitMQ.Stream.Client/ClientExceptions.cs index 2e117407..587b8269 100644 --- a/RabbitMQ.Stream.Client/ClientExceptions.cs +++ b/RabbitMQ.Stream.Client/ClientExceptions.cs @@ -30,12 +30,15 @@ internal static bool IsAKnownException(Exception exception) if (exception is AggregateException aggregateException) { var x = aggregateException.InnerExceptions.Select(x => - x.GetType() == typeof(SocketException) || x.GetType() == typeof(TimeoutException) || - x.GetType() == typeof(LeaderNotFoundException) || x.GetType() == typeof(InvalidOperationException)); + x.GetType() == typeof(SocketException) || + x.GetType() == typeof(TimeoutException) || + x.GetType() == typeof(LeaderNotFoundException) || + x.GetType() == typeof(OperationCanceledException) || + x.GetType() == typeof(InvalidOperationException)); return x.Any(); } - return exception is (SocketException or TimeoutException or LeaderNotFoundException or InvalidOperationException) || + return exception is (SocketException or TimeoutException or LeaderNotFoundException or InvalidOperationException or OperationCanceledException) || IsStreamNotAvailable(exception); } diff --git a/RabbitMQ.Stream.Client/ConnectionsPool.cs b/RabbitMQ.Stream.Client/ConnectionsPool.cs index 8e0c3a87..2a445f65 100644 --- a/RabbitMQ.Stream.Client/ConnectionsPool.cs +++ b/RabbitMQ.Stream.Client/ConnectionsPool.cs @@ -154,8 +154,10 @@ internal async Task GetOrCreateClient(string brokerInfo, Func x.BrokerInfo == brokerInfo && x.Available); connectionItem.LastUsed = DateTime.UtcNow; - if ((Client)(connectionItem.Client) is not {IsClosed: true}) return connectionItem.Client; - + + if (connectionItem.Client is not { IsClosed: true }) + return connectionItem.Client; + // the connection is closed // let's remove it from the pool Connections.TryRemove(connectionItem.Client.ClientId, out _); @@ -163,7 +165,6 @@ internal async Task GetOrCreateClient(string brokerInfo, Func GetOrCreateClient(string brokerInfo, Func>, Action<(ulong, ResponseCode)[]>))> Publishers { get; } IDictionary Consumers { get; } + + public bool IsClosed { get; } } } diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index 36651f15..09f0461f 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -98,6 +98,7 @@ RabbitMQ.Stream.Client.HashRoutingMurmurStrategy.Route(RabbitMQ.Stream.Client.Me RabbitMQ.Stream.Client.IClient.ClientId.get -> string RabbitMQ.Stream.Client.IClient.ClientId.init -> void RabbitMQ.Stream.Client.IClient.Consumers.get -> System.Collections.Generic.IDictionary +RabbitMQ.Stream.Client.IClient.IsClosed.get -> bool RabbitMQ.Stream.Client.IClient.Publishers.get -> System.Collections.Generic.IDictionary>, System.Action<(ulong, RabbitMQ.Stream.Client.ResponseCode)[]>))> RabbitMQ.Stream.Client.IClosable RabbitMQ.Stream.Client.IClosable.Close() -> System.Threading.Tasks.Task @@ -165,12 +166,12 @@ RabbitMQ.Stream.Client.RawConsumerConfig.ConnectionClosedHandler.set -> void RabbitMQ.Stream.Client.RawProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo RabbitMQ.Stream.Client.RawSuperStreamConsumer.Close() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.RawSuperStreamConsumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo -RabbitMQ.Stream.Client.RawSuperStreamConsumer.ReconnectPartition(string stream) -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.RawSuperStreamConsumer.ReconnectPartition(RabbitMQ.Stream.Client.StreamInfo streamInfo) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig.ConnectionClosedHandler.get -> System.Func RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig.ConnectionClosedHandler.set -> void RabbitMQ.Stream.Client.RawSuperStreamProducer.Close() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.RawSuperStreamProducer.Info.get -> RabbitMQ.Stream.Client.ProducerInfo -RabbitMQ.Stream.Client.RawSuperStreamProducer.ReconnectPartition(string stream) -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.RawSuperStreamProducer.ReconnectPartition(RabbitMQ.Stream.Client.StreamInfo streamInfo) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.ConnectionClosedHandler.get -> System.Func RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.ConnectionClosedHandler.set -> void RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType @@ -201,11 +202,7 @@ RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void RabbitMQ.Stream.Client.Reliable.ProducerFactory.CreateProducer(bool boot) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Reliable.ProducerFactory._producer -> RabbitMQ.Stream.Client.IProducer RabbitMQ.Stream.Client.Reliable.ReliableBase.CompareStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus toTest) -> bool -RabbitMQ.Stream.Client.Reliable.ReliableBase.IsValidStatus() -> bool -RabbitMQ.Stream.Client.Reliable.ReliableBase.LogException(System.Exception exception) -> void -RabbitMQ.Stream.Client.Reliable.ReliableBase.MaybeReconnectPartition(string stream, string info, System.Func reconnectPartitionFunc) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.Reliable.ReliableBase.UpdateStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus status) -> void -RabbitMQ.Stream.Client.Reliable.ReliableBase._reconnectStrategy -> RabbitMQ.Stream.Client.Reconnect.IReconnectStrategy RabbitMQ.Stream.Client.Reliable.ReliableBase._status -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus RabbitMQ.Stream.Client.Reliable.ReliableConfig.ReconnectStrategy.get -> RabbitMQ.Stream.Client.Reconnect.IReconnectStrategy RabbitMQ.Stream.Client.Reliable.ReliableConfig.ResourceAvailableReconnectStrategy.get -> RabbitMQ.Stream.Client.Reconnect.IReconnectStrategy @@ -242,6 +239,7 @@ RabbitMQ.Stream.Client.StreamStatsResponse.Statistic.get -> System.Collections.G RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse() -> void RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode, System.Collections.Generic.IDictionary statistic) -> void RabbitMQ.Stream.Client.StreamStatsResponse.Write(System.Span span) -> int +RabbitMQ.Stream.Client.StreamSystem.StreamInfo(string streamName) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.set -> void diff --git a/RabbitMQ.Stream.Client/RawConsumer.cs b/RabbitMQ.Stream.Client/RawConsumer.cs index 807822a4..bb8fc1d5 100644 --- a/RabbitMQ.Stream.Client/RawConsumer.cs +++ b/RabbitMQ.Stream.Client/RawConsumer.cs @@ -143,7 +143,7 @@ private RawConsumer(Client client, RawConsumerConfig config, ILogger logger = nu Logger = logger ?? NullLogger.Instance; _initialCredits = config.InitialCredits; _config = config; - Logger.LogDebug("Creating... {ConsumerInfo}", DumpEntityConfiguration()); + Logger.LogDebug("Creating... {DumpEntityConfiguration}", DumpEntityConfiguration()); Info = new ConsumerInfo(_config.Stream, _config.Reference); // _chunksBuffer is a channel that is used to buffer the chunks _chunksBuffer = Channel.CreateBounded(new BoundedChannelOptions(_initialCredits) @@ -472,7 +472,7 @@ await _chunksBuffer.Reader.WaitToReadAsync(Token).ConfigureAwait(false)) // }, Token); } - internal async Task Init() + private async Task Init() { _config.Validate(); diff --git a/RabbitMQ.Stream.Client/RawProducer.cs b/RabbitMQ.Stream.Client/RawProducer.cs index 754dda73..74a1b3e3 100644 --- a/RabbitMQ.Stream.Client/RawProducer.cs +++ b/RabbitMQ.Stream.Client/RawProducer.cs @@ -57,7 +57,7 @@ public class RawProducer : AbstractEntity, IProducer, IDisposable protected override string GetStream() => _config.Stream; - protected override string DumpEntityConfiguration() + protected sealed override string DumpEntityConfiguration() { return $"Producer id {EntityId} for stream: {_config.Stream}, reference: {_config.Reference}," + @@ -88,6 +88,8 @@ private RawProducer(Client client, RawProducerConfig config, ILogger logger = nu _client = client; _config = config; Info = new ProducerInfo(_config.Stream, _config.Reference); + Logger = logger ?? NullLogger.Instance; + Logger.LogDebug("Creating... {DumpEntityConfiguration}", DumpEntityConfiguration()); _messageBuffer = Channel.CreateBounded(new BoundedChannelOptions(10000) { AllowSynchronousContinuations = false, @@ -95,7 +97,6 @@ private RawProducer(Client client, RawProducerConfig config, ILogger logger = nu SingleWriter = false, FullMode = BoundedChannelFullMode.Wait }); - Logger = logger ?? NullLogger.Instance; Task.Run(ProcessBuffer); _semaphore = new SemaphoreSlim(config.MaxInFlight, config.MaxInFlight); } @@ -167,12 +168,11 @@ private Client.ConnectionCloseHandler OnConnectionClosed() => _client.ConnectionClosed -= OnConnectionClosed(); _client.Parameters.OnMetadataUpdate -= OnMetadataUpdate(); _config.Pool.Remove(_client.ClientId); - await Shutdown(_config, true).ConfigureAwait(false); + UpdateStatusToClosed(); if (_config.ConnectionClosedHandler != null) { await _config.ConnectionClosedHandler(reason).ConfigureAwait(false); } - }; private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() => @@ -187,12 +187,11 @@ private ClientParameters.MetadataUpdateHandler OnMetadataUpdate() => _client.ConnectionClosed -= OnConnectionClosed(); _client.Parameters.OnMetadataUpdate -= OnMetadataUpdate(); - _config.Pool.RemoveProducerEntityFromStream(_client.ClientId, EntityId, _config.Stream); - // at this point the server has removed the producer from the list // and the DeletePublisher producer is not needed anymore (ignoreIfClosed = true) // we call the Close to re-enter to the standard behavior // ignoreIfClosed is an optimization to avoid to send the DeletePublisher + _config.Pool.RemoveProducerEntityFromStream(_client.ClientId, EntityId, _config.Stream); await Shutdown(_config, true).ConfigureAwait(false); _config.MetadataHandler?.Invoke(metaDataUpdate); }; @@ -364,7 +363,7 @@ private async Task ProcessBuffer() } catch (Exception e) { - Logger.LogError(e, "error while Process Buffer"); + Logger.LogError(e, "{DumpEntityConfiguration} Error while Process Buffer", DumpEntityConfiguration()); } } diff --git a/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs b/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs index a372ab78..b3739bb5 100644 --- a/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs +++ b/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs @@ -81,20 +81,29 @@ private RawConsumerConfig FromStreamConfig(string stream) ConnectionClosedHandler = async (reason) => { _consumers.TryRemove(stream, out var consumer); - consumer?.Close(); - if (consumer != null) + if (reason == ConnectionClosedReason.Normal) { - _logger.LogInformation( - "Consumer {ConsumerReference} is disconnected from {StreamIdentifier} reason {Reason}", - _config.Reference, + _logger.LogDebug( + "Super Stream consumer {@ConsumerInfo} is closed normally from {StreamIdentifier}", + consumer?.Info, + stream + ); + } + else + { + _logger.LogWarning( + "Super Stream consumer {@ConsumerInfo} is disconnected from {StreamIdentifier} reason: {Reason}", + consumer?.Info, stream, reason ); + } - if (_config.ConnectionClosedHandler != null) - { - await _config.ConnectionClosedHandler(reason, stream).ConfigureAwait(false); - } + consumer?.Dispose(); + _streamInfos.TryRemove(stream, out _); + if (_config.ConnectionClosedHandler != null) + { + await _config.ConnectionClosedHandler(reason, stream).ConfigureAwait(false); } }, MessageHandler = async (consumer, context, message) => @@ -112,6 +121,7 @@ private RawConsumerConfig FromStreamConfig(string stream) { _consumers.TryRemove(update.Stream, out var consumer); consumer?.Close(); + _streamInfos.TryRemove(update.Stream, out _); if (_config.MetadataHandler != null) { await _config.MetadataHandler(update).ConfigureAwait(false); @@ -126,7 +136,7 @@ private async Task InitConsumer(string stream) var c = await RawConsumer.Create( _clientParameters with { ClientProvidedName = _clientParameters.ClientProvidedName }, FromStreamConfig(stream), _streamInfos[stream], _logger).ConfigureAwait(false); - _logger?.LogDebug("Consumer {ConsumerReference} created for Stream {StreamIdentifier}", _config.Reference, + _logger?.LogDebug("Super stream consumer {ConsumerReference} created for Stream {StreamIdentifier}", c.Info, stream); return c; } @@ -148,14 +158,15 @@ private async Task StartConsumers() } } - public async Task ReconnectPartition(string stream) + public async Task ReconnectPartition(StreamInfo streamInfo) { await _semaphoreSlim.WaitAsync().ConfigureAwait(false); try { - _consumers.TryRemove(stream, out var consumer); - consumer?.Close(); - await MaybeAddConsumer(stream).ConfigureAwait(false); + _consumers.TryRemove(streamInfo.Stream, out var consumer); + consumer?.Dispose(); + _streamInfos.TryAdd(streamInfo.Stream, streamInfo); // add the new stream infos + await MaybeAddConsumer(streamInfo.Stream).ConfigureAwait(false); } finally { diff --git a/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs b/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs index 3f546ac4..1ba2fd6c 100644 --- a/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs +++ b/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs @@ -39,7 +39,9 @@ public class RawSuperStreamProducer : IProducer, IDisposable // For example: // invoices(super_stream) -> invoices-0, invoices-1, invoices-2 // Streams contains the configuration for each stream but not the connection + private readonly IDictionary _streamInfos; + private readonly ClientParameters _clientParameters; private readonly ILogger _logger; private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1); @@ -92,8 +94,20 @@ private RawProducerConfig FromStreamConfig(string stream) Pool = _config.Pool, ConnectionClosedHandler = async (reason) => { - _producers.TryRemove(stream, out var producer); - producer?.Close(); + _producers.TryGetValue(stream, out var producer); + if (reason == ConnectionClosedReason.Normal) + { + _logger.LogDebug("Super Stream producer {@ProducerInfo} is closed normally", producer?.Info); + } + else + { + _logger.LogWarning( + "Super Stream producer {@ProducerInfo} is disconnected from {StreamIdentifier} reason: {Reason}", + producer?.Info, + stream, reason + ); + } + if (_config.ConnectionClosedHandler != null) { await _config.ConnectionClosedHandler(reason, stream).ConfigureAwait(false); @@ -101,8 +115,6 @@ private RawProducerConfig FromStreamConfig(string stream) }, MetadataHandler = async update => { - _producers.TryRemove(update.Stream, out var producer); - producer?.Close(); if (_config.MetadataHandler != null) { await _config.MetadataHandler(update).ConfigureAwait(false); @@ -117,17 +129,18 @@ private RawProducerConfig FromStreamConfig(string stream) // The producer is created on demand when a message is sent to a stream private async Task InitProducer(string stream) { - var p = await RawProducer.Create(_clientParameters with {ClientProvidedName = _config.ClientProvidedName}, + var index = _streamInfos.Keys.Select((item, index) => new { Item = item, Index = index }).First(i => i.Item == stream).Index; + var p = await RawProducer.Create(_clientParameters with { ClientProvidedName = $"{_config.ClientProvidedName}_{index}" }, FromStreamConfig(stream), _streamInfos[stream], _logger) .ConfigureAwait(false); - _logger?.LogDebug("Producer {ProducerReference} created for Stream {StreamIdentifier}", _config.Reference, + _logger?.LogDebug("Super stream producer {@ProducerReference} created for Stream {StreamIdentifier}", p.Info, stream); return p; } - protected void ThrowIfClosed() + private void ThrowIfClosed() { if (!IsOpen()) { @@ -146,15 +159,19 @@ private async Task MaybeAddAndGetProducer(string stream) return _producers[stream]; } - - public async Task ReconnectPartition(string stream) + public async Task ReconnectPartition(StreamInfo streamInfo) { await _semaphoreSlim.WaitAsync().ConfigureAwait(false); try { - _producers.TryRemove(stream, out var producer); + _producers.TryRemove(streamInfo.Stream, out var producer); producer?.Close(); - await MaybeAddAndGetProducer(stream).ConfigureAwait(false); + if (!_streamInfos.TryGetValue(streamInfo.Stream, out _)) + { + _streamInfos.TryAdd(streamInfo.Stream, streamInfo); + } + + await MaybeAddAndGetProducer(streamInfo.Stream).ConfigureAwait(false); } finally { @@ -175,7 +192,7 @@ private async Task GetProducerForMessage(Message message) // we should always have a route // but in case of stream KEY the routing could not exist - if (routes is not {Count: > 0}) + if (routes is not { Count: > 0 }) { throw new RouteNotFoundException("No route found for the message to any stream"); } @@ -195,7 +212,15 @@ public async ValueTask Send(ulong publishingId, Message message) { ThrowIfClosed(); var producer = await GetProducerForMessage(message).ConfigureAwait(false); - await producer.Send(publishingId, message).ConfigureAwait(false); + await _semaphoreSlim.WaitAsync().ConfigureAwait(false); + try + { + await producer.Send(publishingId, message).ConfigureAwait(false); + } + finally + { + _semaphoreSlim.Release(); + } } public async ValueTask Send(List<(ulong, Message)> messages) @@ -216,7 +241,7 @@ public async ValueTask Send(List<(ulong, Message)> messages) } else { - aggregate.Add((p, new List<(ulong, Message)>() {(subMessage.Item1, subMessage.Item2)})); + aggregate.Add((p, new List<(ulong, Message)>() { (subMessage.Item1, subMessage.Item2) })); } } @@ -243,7 +268,7 @@ public async ValueTask Send(ulong publishingId, List subEntryMessages, } else { - aggregate.Add((p, new List() {subMessage})); + aggregate.Add((p, new List() { subMessage })); } } @@ -291,7 +316,7 @@ public void Dispose() { foreach (var (_, iProducer) in _producers) { - iProducer.Close(); + iProducer.Dispose(); } _disposed = true; @@ -338,7 +363,6 @@ public RawSuperStreamProducerConfig(string superStream) public Func ConnectionClosedHandler { get; set; } - internal Client Client { get; set; } } @@ -381,7 +405,7 @@ public Task> Route(Message message, List partitions) var key = _routingKeyExtractor(message); var hash = new Murmur32ManagedX86(Seed).ComputeHash(Encoding.UTF8.GetBytes(key)); var index = BitConverter.ToUInt32(hash, 0) % (uint)partitions.Count; - var r = new List() {partitions[(int)index]}; + var r = new List() { partitions[(int)index] }; return Task.FromResult(r); } @@ -414,8 +438,8 @@ public async Task> Route(Message message, List partitions) var c = await _routingKeyQFunc(_superStream, key).ConfigureAwait(false); _cacheStream[key] = c.Streams; return (from resultStream in c.Streams - where partitions.Contains(resultStream) - select new List() {resultStream}).FirstOrDefault(); + where partitions.Contains(resultStream) + select new List() { resultStream }).FirstOrDefault(); } public KeyRoutingStrategy(Func routingKeyExtractor, diff --git a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs index cb23e1c4..62d7d0d9 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs @@ -125,7 +125,7 @@ private async void OnTimedEvent(object sender, ElapsedEventArgs e) foreach (var pair in timedOutMessages) { - await RemoveUnConfirmedMessage(ConfirmationStatus.ClientTimeoutError, pair.Value.PublishingId, null) + await RemoveUnConfirmedMessage(ConfirmationStatus.ClientTimeoutError, pair.Value.PublishingId, pair.Value.Stream) .ConfigureAwait(false); } } diff --git a/RabbitMQ.Stream.Client/Reliable/Consumer.cs b/RabbitMQ.Stream.Client/Reliable/Consumer.cs index c63dcee8..eedc273e 100644 --- a/RabbitMQ.Stream.Client/Reliable/Consumer.cs +++ b/RabbitMQ.Stream.Client/Reliable/Consumer.cs @@ -175,8 +175,6 @@ public static async Task Create(ConsumerConfig consumerConfig, ILogger var rConsumer = new Consumer(consumerConfig, logger); await rConsumer.Init(consumerConfig.ReconnectStrategy, consumerConfig.ResourceAvailableReconnectStrategy) .ConfigureAwait(false); - logger?.LogDebug("Consumer: {Reference} created for Stream: {Stream}", - consumerConfig.Reference, consumerConfig.Stream); return rConsumer; } diff --git a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs index c1f74239..f9a6b10a 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs @@ -2,7 +2,6 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; using System.Collections.Concurrent; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -44,6 +43,10 @@ private async Task StandardConsumer(bool boot) offsetSpec = new OffsetTypeOffset(_lastOffsetConsumed[_consumerConfig.Stream] + 1); } + // before creating a new consumer, the old one is disposed + // This is just a safety check, the consumer should be already disposed + _consumer?.Dispose(); + return await _consumerConfig.StreamSystem.CreateRawConsumer(new RawConsumerConfig(_consumerConfig.Stream) { ClientProvidedName = _consumerConfig.ClientProvidedName, @@ -124,11 +127,10 @@ private async Task SuperConsumer(bool boot) BaseLogger.LogInformation("{Identity} is closed normally", ToString()); return; } - + var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition; await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r) .ConfigureAwait(false); - }, MetadataHandler = async update => { @@ -150,7 +152,5 @@ await _consumerConfig.MessageHandler(partitionStream, consumer, ctx, } return _consumer; - - } } diff --git a/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs b/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs index 90af1a1e..92dbbf53 100644 --- a/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs +++ b/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs @@ -56,17 +56,28 @@ private void MaybeResetTentatives() public async ValueTask WhenDisconnected(string connectionIdentifier) { + Tentatives <<= 1; - var next = Random.Shared.Next(Tentatives * 1000, Tentatives * 2000); _logger.LogInformation( "{ConnectionIdentifier} disconnected, check if reconnection needed in {ReconnectionDelayMs} ms", connectionIdentifier, - next + Tentatives * 100 ); - - await Task.Delay(TimeSpan.FromMilliseconds(next)).ConfigureAwait(false); + await Task.Delay(TimeSpan.FromMilliseconds(Tentatives * 100)).ConfigureAwait(false); MaybeResetTentatives(); return true; + // this will be in another commit + // Tentatives <<= 1; + // var next = Random.Shared.Next(Tentatives * 1000, Tentatives * 2000); + // _logger.LogInformation( + // "{ConnectionIdentifier} disconnected, check if reconnection needed in {ReconnectionDelayMs} ms", + // connectionIdentifier, + // next + // ); + // + // await Task.Delay(TimeSpan.FromMilliseconds(next)).ConfigureAwait(false); + // MaybeResetTentatives(); + // return true; } public ValueTask WhenConnected(string connectionIdentifier) diff --git a/RabbitMQ.Stream.Client/Reliable/Producer.cs b/RabbitMQ.Stream.Client/Reliable/Producer.cs index 26a799a5..1fcb0536 100644 --- a/RabbitMQ.Stream.Client/Reliable/Producer.cs +++ b/RabbitMQ.Stream.Client/Reliable/Producer.cs @@ -163,12 +163,6 @@ public static async Task Create(ProducerConfig producerConfig, ILogger var rProducer = new Producer(producerConfig, logger); await rProducer.Init(producerConfig.ReconnectStrategy, producerConfig.ResourceAvailableReconnectStrategy) .ConfigureAwait(false); - logger?.LogDebug( - "Producer: {Reference} created for Stream: {Stream}", - producerConfig.Reference, - producerConfig.Stream - ); - return rProducer; } diff --git a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs index 24acb685..afd4027f 100644 --- a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs @@ -34,8 +34,6 @@ private async Task SuperStreamProducer(bool boot) if (boot) { - - return await _producerConfig.StreamSystem.CreateRawSuperStreamProducer( new RawSuperStreamProducerConfig(_producerConfig.Stream) { @@ -50,7 +48,7 @@ private async Task SuperStreamProducer(bool boot) { if (closeReason == ConnectionClosedReason.Normal) { - BaseLogger.LogInformation("{Identity} is closed normally", ToString()); + BaseLogger.LogDebug("{Identity} is closed normally", ToString()); return; } @@ -83,11 +81,16 @@ await OnEntityClosed(_producerConfig.StreamSystem, update.Stream, r) } }, BaseLogger).ConfigureAwait(false); } + return _producer; } private async Task StandardProducer() { + // before creating a new producer, the old one is disposed + // This is just a safety check, the producer should be already disposed + _producer?.Dispose(); + return await _producerConfig.StreamSystem.CreateRawProducer(new RawProducerConfig(_producerConfig.Stream) { ClientProvidedName = _producerConfig.ClientProvidedName, @@ -102,7 +105,7 @@ private async Task StandardProducer() { if (closeReason == ConnectionClosedReason.Normal) { - BaseLogger.LogInformation("{Identity} is closed normally", ToString()); + BaseLogger.LogDebug("{Identity} is closed normally", ToString()); return; } diff --git a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs index e6782561..bb885514 100644 --- a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs +++ b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs @@ -66,7 +66,7 @@ protected bool CompareStatus(ReliableEntityStatus toTest) } } - protected bool IsValidStatus() + private bool IsValidStatus() { lock (_lock) { @@ -76,7 +76,7 @@ protected bool IsValidStatus() } protected abstract ILogger BaseLogger { get; } - protected IReconnectStrategy _reconnectStrategy; + private IReconnectStrategy _reconnectStrategy; private IReconnectStrategy _resourceAvailableReconnectStrategy; internal async Task Init(IReconnectStrategy reconnectStrategy, @@ -103,27 +103,20 @@ private async Task MaybeInit(bool boot) { if (boot) { + BaseLogger.LogError("{Identity} Error during the first boot {EMessage}", + ToString(), e.Message); // if it is the first boot we don't need to reconnect UpdateStatus(ReliableEntityStatus.Closed); throw; } - reconnect = ClientExceptions.IsAKnownException(e); - + reconnect = true; LogException(e); - if (!reconnect) - { - // We consider the client as closed - // since the exception is raised to the caller - UpdateStatus(ReliableEntityStatus.Closed); - throw; - } + } if (reconnect) - { await MaybeReconnect().ConfigureAwait(false); - } } // @@ -134,10 +127,9 @@ private async Task Init(bool boot) { if (!boot && !IsValidStatus()) { - BaseLogger.LogInformation("{Identity} is already closed", ToString()); + BaseLogger.LogDebug("{Identity} is already closed. The init will be skipped", ToString()); return; } - // each time that the client is initialized, we need to reset the status // if we hare here it means that the entity is not open for some reason like: // first time initialization or reconnect due of a IsAKnownException @@ -161,6 +153,14 @@ private async Task Init(bool boot) /// protected abstract Task CreateNewEntity(bool boot); + /// + /// When the clients receives a meta data update, it doesn't know + /// If the stream exists or not. It just knows that the stream topology has changed. + /// the method CheckIfStreamIsAvailable checks if the stream exists. + /// + /// stream name + /// stream system + /// private async Task CheckIfStreamIsAvailable(string stream, StreamSystem system) { await Task.Delay(Consts.RandomMid()).ConfigureAwait(false); @@ -183,18 +183,20 @@ await _resourceAvailableReconnectStrategy.WhenConnected($"{stream} is {available } } - if (!exists) - { - // In this case the stream doesn't exist anymore - // the Entity is just closed. - BaseLogger.LogInformation( - "Meta data update stream: {StreamIdentifier}. The stream doesn't exist anymore {Identity} will be closed", - stream, - ToString() - ); - } + if (exists) + return true; + // In this case the stream doesn't exist anymore or it failed to check if the stream exists + // too many tentatives for the reconnection strategy + // the Entity is just closed. + var msg = tryAgain ? "The stream doesn't exist anymore" : "Failed to check if the stream exists"; + + BaseLogger.LogInformation( + "Meta data update stream: {StreamIdentifier}. {Msg} {Identity} will be closed", + stream, msg, + ToString() + ); - return exists; + return false; } // @@ -206,6 +208,7 @@ private async Task MaybeReconnect() var reconnect = await _reconnectStrategy.WhenDisconnected(ToString()).ConfigureAwait(false); if (!reconnect) { + BaseLogger.LogDebug("{Identity} is closed due of reconnect strategy", ToString()); UpdateStatus(ReliableEntityStatus.Closed); return; } @@ -219,21 +222,21 @@ private async Task MaybeReconnect() case false: if (CompareStatus(ReliableEntityStatus.Reconnecting)) { - BaseLogger.LogInformation("{Identity} is in Reconnecting", ToString()); + BaseLogger.LogDebug("{Identity} is in Reconnecting", ToString()); } break; } } - - - protected async Task MaybeReconnectPartition(string stream, string info, Func reconnectPartitionFunc) + + private async Task MaybeReconnectPartition(StreamInfo streamInfo, string info, Func reconnectPartitionFunc) { var reconnect = await _reconnectStrategy - .WhenDisconnected($"Super Stream partition: {stream} for {info}").ConfigureAwait(false); + .WhenDisconnected($"Super Stream partition: {streamInfo.Stream} for {info}").ConfigureAwait(false); if (!reconnect) { + BaseLogger.LogDebug("{Identity} partition is closed due of reconnect strategy", ToString()); UpdateStatus(ReliableEntityStatus.Closed); return; } @@ -241,33 +244,19 @@ protected async Task MaybeReconnectPartition(string stream, string info, Func - /// When the clients receives a meta data update, it doesn't know - /// the reason. - /// Metadata update can be raised when: - /// - stream is deleted - /// - change the stream topology (ex: add a follower) - /// - /// HandleMetaDataMaybeReconnect checks if the stream still exists - /// and try to reconnect. - /// (internal because it is needed for tests) - /// - protected void LogException(Exception exception) + private void LogException(Exception exception) { const string KnownExceptionTemplate = "{Identity} trying to reconnect due to exception {Err}"; const string UnknownExceptionTemplate = "{Identity} received an exception during initialization"; @@ -290,7 +279,13 @@ protected void LogException(Exception exception) /// protected abstract Task CloseEntity(); - internal async Task OnEntityClosed(StreamSystem system, string stream, Func reconnectPartitionFunc) + /// + /// Handle the partition reconnection in case of super stream entity + /// + /// Stream System + /// Partition Stream + /// Function to reconnect the partition + internal async Task OnEntityClosed(StreamSystem system, string stream, Func reconnectPartitionFunc) { var streamExists = false; await SemaphoreSlim.WaitAsync().ConfigureAwait(false); @@ -300,8 +295,8 @@ internal async Task OnEntityClosed(StreamSystem system, string stream, Func + /// Handle the regular stream reconnection + /// + /// Stream system + /// Stream internal async Task OnEntityClosed(StreamSystem system, string stream) { var streamExists = false; diff --git a/RabbitMQ.Stream.Client/RoutingClient.cs b/RabbitMQ.Stream.Client/RoutingClient.cs index fa3fde26..9a22a717 100644 --- a/RabbitMQ.Stream.Client/RoutingClient.cs +++ b/RabbitMQ.Stream.Client/RoutingClient.cs @@ -170,15 +170,17 @@ public static async Task LookupRandomConnection(ClientParameters client { var brokers = new List() { metaDataInfo.Leader }; brokers.AddRange(metaDataInfo.Replicas); - var br = brokers.OrderBy(x => Random.Shared.Next()).ToList(); var exceptions = new List(); + var br = brokers.OrderBy(x => Random.Shared.Next()).ToList(); + foreach (var broker in br) { try { return await pool.GetOrCreateClient(broker.ToString(), async () => - await LookupConnection(clientParameters, broker, MaxAttempts(metaDataInfo), logger) + await LookupConnection(clientParameters, broker, MaxAttempts(metaDataInfo), + logger) .ConfigureAwait(false)).ConfigureAwait(false); } catch (Exception ex) diff --git a/RabbitMQ.Stream.Client/StreamSystem.cs b/RabbitMQ.Stream.Client/StreamSystem.cs index 88a501d5..4e3763e8 100644 --- a/RabbitMQ.Stream.Client/StreamSystem.cs +++ b/RabbitMQ.Stream.Client/StreamSystem.cs @@ -284,8 +284,6 @@ public async Task CreateRawProducer(RawProducerConfig rawProducerConf var p = await RawProducer.Create(s, rawProducerConfig, metaStreamInfo, logger).ConfigureAwait(false); - _logger?.LogDebug("Raw Producer: {Reference} created for Stream: {Stream}", - rawProducerConfig.Reference, rawProducerConfig.Stream); return p; } finally @@ -294,7 +292,7 @@ public async Task CreateRawProducer(RawProducerConfig rawProducerConf } } - private async Task StreamInfo(string streamName) + public async Task StreamInfo(string streamName) { // force localhost connection for single node clusters and when address resolver is not provided // when theres 1 endpoint and an address resolver, there could be a cluster behind a load balancer diff --git a/Tests/SuperStreamProducerTests.cs b/Tests/SuperStreamProducerTests.cs index fd7a3cc6..b337dac7 100644 --- a/Tests/SuperStreamProducerTests.cs +++ b/Tests/SuperStreamProducerTests.cs @@ -143,7 +143,6 @@ await system.CreateRawSuperStreamProducer(new RawSuperStreamProducerConfig(Syste { Routing = message1 => message1.Properties.MessageId.ToString(), Reference = "reference", - }); Assert.True(streamProducer.MessagesSent == 0); Assert.True(streamProducer.ConfirmFrames == 0); @@ -421,16 +420,27 @@ public async void SendMessageToSuperStreamRecreateConnectionsIfKilled() SystemUtils.ResetSuperStreams(); // This test validates that the super stream producer is able to recreate the connection // if the connection is killed - // It is NOT meant to test the availability of the super stream producer - // just the reconnect mechanism + // we use the connection closed handler to recreate the connection + // the method ReconnectPartition is used to reconnect the partition var system = await StreamSystem.Create(new StreamSystemConfig()); var clientName = Guid.NewGuid().ToString(); - var streamProducer = - await system.CreateRawSuperStreamProducer(new RawSuperStreamProducerConfig(SystemUtils.InvoicesExchange) - { - Routing = message1 => message1.Properties.MessageId.ToString(), - ClientProvidedName = clientName - }); + RawSuperStreamProducer streamProducer = null; + + var c = new RawSuperStreamProducerConfig(SystemUtils.InvoicesExchange) + { + Routing = message1 => message1.Properties.MessageId.ToString(), + ClientProvidedName = clientName, + }; + var completed = new TaskCompletionSource(); + streamProducer = (RawSuperStreamProducer)await system.CreateRawSuperStreamProducer(c); + c.ConnectionClosedHandler = async (reason, stream) => + { + var streamInfo = await system.StreamInfo(stream); + await streamProducer.ReconnectPartition(streamInfo); + SystemUtils.Wait(); + completed.SetResult(true); + }; + for (ulong i = 0; i < 20; i++) { var message = new Message(Encoding.Default.GetBytes("hello")) @@ -440,8 +450,8 @@ await system.CreateRawSuperStreamProducer(new RawSuperStreamProducerConfig(Syste if (i == 10) { - SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections(clientName).Result == 3); - // We just decide to close the connections + SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections($"{clientName}_0").Result == 1); + completed.Task.Wait(); } // Here the connection _must_ be recreated and the send the message @@ -453,7 +463,7 @@ await system.CreateRawSuperStreamProducer(new RawSuperStreamProducerConfig(Syste // according to the routing strategy hello{i} that must be the correct routing SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream0) == 9); SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream1) == 7); - SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount("invoices-2") == 4); + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream2) == 4); Assert.True(await streamProducer.Close() == ResponseCode.Ok); await system.Close(); } @@ -533,7 +543,7 @@ await system.CreateRawSuperStreamProducer(new RawSuperStreamProducerConfig(Syste } } }); - for (ulong i = 0; i < 10; i++) + for (ulong i = 0; i < 20; i++) { var message = new Message(Encoding.Default.GetBytes("hello")) { @@ -549,12 +559,21 @@ await system.CreateRawSuperStreamProducer(new RawSuperStreamProducerConfig(Syste } Thread.Sleep(200); - - await streamProducer.Send(i, message); + try + { + await streamProducer.Send(i, message); + } + catch (Exception e) + { + Assert.True(e is AlreadyClosedException); + } } SystemUtils.Wait(); new Utils(_testOutputHelper).WaitUntilTaskCompletes(testPassed); + // even we removed a stream the producer should be able to send messages and maintain the hash routing + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream1) == 7); + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream2) == 4); Assert.True(await streamProducer.Close() == ResponseCode.Ok); await system.Close(); } @@ -750,13 +769,29 @@ public async void ReliableProducerSendMessageConnectionsIfKilled() // just the reconnect mechanism var system = await StreamSystem.Create(new StreamSystemConfig()); var clientName = Guid.NewGuid().ToString(); + var testPassed = new TaskCompletionSource() { }; + var received = 0; + var error = 0; var streamProducer = await Producer.Create(new ProducerConfig(system, SystemUtils.InvoicesExchange) { - SuperStreamConfig = new SuperStreamConfig() + SuperStreamConfig = + new SuperStreamConfig() { Routing = message1 => message1.Properties.MessageId.ToString() }, + TimeoutMessageAfter = TimeSpan.FromSeconds(1), + ConfirmationHandler = async confirmation => { - Routing = message1 => message1.Properties.MessageId.ToString() + if (confirmation.Status != ConfirmationStatus.Confirmed) + { + Interlocked.Increment(ref error); + } + + if (Interlocked.Increment(ref received) == 20) + { + testPassed.SetResult(true); + } + + await Task.CompletedTask; }, - ClientProvidedName = clientName + ClientProvidedName = clientName, }); for (ulong i = 0; i < 20; i++) { @@ -764,14 +799,11 @@ public async void ReliableProducerSendMessageConnectionsIfKilled() { Properties = new Properties() { MessageId = $"hello{i}" } }; - if (i == 10) { - SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections(clientName).Result == 3); // We just decide to close the connections - // we just wait a bit to be sure that the connections - // will be re-opened - SystemUtils.Wait(TimeSpan.FromSeconds(1)); + // The messages will go in time out since not confirmed + SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections($"{clientName}_0").Result == 1); } // Here the connection _must_ be recreated and the message sent @@ -779,11 +811,13 @@ public async void ReliableProducerSendMessageConnectionsIfKilled() } SystemUtils.Wait(); - // Total messages must be 20 - // according to the routing strategy hello{i} that must be the correct routing - SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream0) == 9); + new Utils(_testOutputHelper).WaitUntilTaskCompletes(testPassed); + // killed the connection for the InvoicesStream0. So received + error must be 9 + SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream0) + error == 9); SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream1) == 7); SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream2) == 4); + + await streamProducer.Close(); await system.Close(); } } diff --git a/Tests/UnitTests.cs b/Tests/UnitTests.cs index 34dabc26..7e109f02 100644 --- a/Tests/UnitTests.cs +++ b/Tests/UnitTests.cs @@ -32,10 +32,12 @@ public Task Close(string reason) } public IDictionary Consumers { get; } + public bool IsClosed { get; } public FakeClient(ClientParameters clientParameters) { Parameters = clientParameters; + IsClosed = false; Publishers = new Dictionary>, Action<(ulong, ResponseCode)[]>))>(); Consumers = new Dictionary(); From 2893c1b56e252b22897a70f053b7203c902a217e Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 16 Jan 2024 21:53:43 +0100 Subject: [PATCH 4/9] optimization Signed-off-by: Gabriele Santomaggio --- .../RawSuperStreamProducer.cs | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs b/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs index 1ba2fd6c..74117dd3 100644 --- a/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs +++ b/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs @@ -129,8 +129,10 @@ private RawProducerConfig FromStreamConfig(string stream) // The producer is created on demand when a message is sent to a stream private async Task InitProducer(string stream) { - var index = _streamInfos.Keys.Select((item, index) => new { Item = item, Index = index }).First(i => i.Item == stream).Index; - var p = await RawProducer.Create(_clientParameters with { ClientProvidedName = $"{_config.ClientProvidedName}_{index}" }, + var index = _streamInfos.Keys.Select((item, index) => new {Item = item, Index = index}) + .First(i => i.Item == stream).Index; + var p = await RawProducer.Create( + _clientParameters with {ClientProvidedName = $"{_config.ClientProvidedName}_{index}"}, FromStreamConfig(stream), _streamInfos[stream], _logger) @@ -166,11 +168,7 @@ public async Task ReconnectPartition(StreamInfo streamInfo) { _producers.TryRemove(streamInfo.Stream, out var producer); producer?.Close(); - if (!_streamInfos.TryGetValue(streamInfo.Stream, out _)) - { - _streamInfos.TryAdd(streamInfo.Stream, streamInfo); - } - + _streamInfos[streamInfo.Stream] = streamInfo; // add the new stream infos await MaybeAddAndGetProducer(streamInfo.Stream).ConfigureAwait(false); } finally @@ -192,7 +190,7 @@ private async Task GetProducerForMessage(Message message) // we should always have a route // but in case of stream KEY the routing could not exist - if (routes is not { Count: > 0 }) + if (routes is not {Count: > 0}) { throw new RouteNotFoundException("No route found for the message to any stream"); } @@ -241,7 +239,7 @@ public async ValueTask Send(List<(ulong, Message)> messages) } else { - aggregate.Add((p, new List<(ulong, Message)>() { (subMessage.Item1, subMessage.Item2) })); + aggregate.Add((p, new List<(ulong, Message)>() {(subMessage.Item1, subMessage.Item2)})); } } @@ -268,7 +266,7 @@ public async ValueTask Send(ulong publishingId, List subEntryMessages, } else { - aggregate.Add((p, new List() { subMessage })); + aggregate.Add((p, new List() {subMessage})); } } @@ -405,7 +403,7 @@ public Task> Route(Message message, List partitions) var key = _routingKeyExtractor(message); var hash = new Murmur32ManagedX86(Seed).ComputeHash(Encoding.UTF8.GetBytes(key)); var index = BitConverter.ToUInt32(hash, 0) % (uint)partitions.Count; - var r = new List() { partitions[(int)index] }; + var r = new List() {partitions[(int)index]}; return Task.FromResult(r); } @@ -438,8 +436,8 @@ public async Task> Route(Message message, List partitions) var c = await _routingKeyQFunc(_superStream, key).ConfigureAwait(false); _cacheStream[key] = c.Streams; return (from resultStream in c.Streams - where partitions.Contains(resultStream) - select new List() { resultStream }).FirstOrDefault(); + where partitions.Contains(resultStream) + select new List() {resultStream}).FirstOrDefault(); } public KeyRoutingStrategy(Func routingKeyExtractor, From 1007d1a3b814fb486419fadbe6c40ab26e93b76a Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 16 Jan 2024 21:59:52 +0100 Subject: [PATCH 5/9] formatting Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/RawSuperStreamProducer.cs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs b/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs index 74117dd3..7b3da5af 100644 --- a/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs +++ b/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs @@ -129,10 +129,10 @@ private RawProducerConfig FromStreamConfig(string stream) // The producer is created on demand when a message is sent to a stream private async Task InitProducer(string stream) { - var index = _streamInfos.Keys.Select((item, index) => new {Item = item, Index = index}) + var index = _streamInfos.Keys.Select((item, index) => new { Item = item, Index = index }) .First(i => i.Item == stream).Index; var p = await RawProducer.Create( - _clientParameters with {ClientProvidedName = $"{_config.ClientProvidedName}_{index}"}, + _clientParameters with { ClientProvidedName = $"{_config.ClientProvidedName}_{index}" }, FromStreamConfig(stream), _streamInfos[stream], _logger) @@ -190,7 +190,7 @@ private async Task GetProducerForMessage(Message message) // we should always have a route // but in case of stream KEY the routing could not exist - if (routes is not {Count: > 0}) + if (routes is not { Count: > 0 }) { throw new RouteNotFoundException("No route found for the message to any stream"); } @@ -239,7 +239,7 @@ public async ValueTask Send(List<(ulong, Message)> messages) } else { - aggregate.Add((p, new List<(ulong, Message)>() {(subMessage.Item1, subMessage.Item2)})); + aggregate.Add((p, new List<(ulong, Message)>() { (subMessage.Item1, subMessage.Item2) })); } } @@ -266,7 +266,7 @@ public async ValueTask Send(ulong publishingId, List subEntryMessages, } else { - aggregate.Add((p, new List() {subMessage})); + aggregate.Add((p, new List() { subMessage })); } } @@ -403,7 +403,7 @@ public Task> Route(Message message, List partitions) var key = _routingKeyExtractor(message); var hash = new Murmur32ManagedX86(Seed).ComputeHash(Encoding.UTF8.GetBytes(key)); var index = BitConverter.ToUInt32(hash, 0) % (uint)partitions.Count; - var r = new List() {partitions[(int)index]}; + var r = new List() { partitions[(int)index] }; return Task.FromResult(r); } @@ -436,8 +436,8 @@ public async Task> Route(Message message, List partitions) var c = await _routingKeyQFunc(_superStream, key).ConfigureAwait(false); _cacheStream[key] = c.Streams; return (from resultStream in c.Streams - where partitions.Contains(resultStream) - select new List() {resultStream}).FirstOrDefault(); + where partitions.Contains(resultStream) + select new List() { resultStream }).FirstOrDefault(); } public KeyRoutingStrategy(Func routingKeyExtractor, From 8d3ff1b526a3625cd12d1ad2db6d9928f6b8213c Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 18 Jan 2024 13:31:52 +0100 Subject: [PATCH 6/9] Add name to the super stream consumer Signed-off-by: Gabriele Santomaggio --- .../RawSuperStreamConsumer.cs | 19 +++++----- Tests/SuperStreamConsumerTests.cs | 38 ++++++++++++++----- 2 files changed, 38 insertions(+), 19 deletions(-) diff --git a/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs b/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs index b3739bb5..8a338e9a 100644 --- a/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs +++ b/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Linq; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -23,7 +24,7 @@ public class RawSuperStreamConsumer : IConsumer, IDisposable private readonly SemaphoreSlim _semaphoreSlim = new(1, 1); // Contains the info about the streams (one per partition) - private readonly ConcurrentDictionary _streamInfos; + private readonly IDictionary _streamInfos; private readonly ClientParameters _clientParameters; private readonly ILogger _logger; @@ -53,12 +54,7 @@ private RawSuperStreamConsumer( ) { _config = config; - _streamInfos = new ConcurrentDictionary(streamInfos); - foreach (var keyValuePair in streamInfos) - { - _streamInfos.TryAdd(keyValuePair.Key, keyValuePair.Value); - } - + _streamInfos = streamInfos; _clientParameters = clientParameters; _logger = logger ?? NullLogger.Instance; Info = new ConsumerInfo(_config.SuperStream, _config.Reference); @@ -99,7 +95,7 @@ private RawConsumerConfig FromStreamConfig(string stream) } consumer?.Dispose(); - _streamInfos.TryRemove(stream, out _); + _streamInfos.Remove(stream); if (_config.ConnectionClosedHandler != null) { @@ -121,7 +117,7 @@ private RawConsumerConfig FromStreamConfig(string stream) { _consumers.TryRemove(update.Stream, out var consumer); consumer?.Close(); - _streamInfos.TryRemove(update.Stream, out _); + _streamInfos.Remove(update.Stream); if (_config.MetadataHandler != null) { await _config.MetadataHandler(update).ConfigureAwait(false); @@ -133,8 +129,11 @@ private RawConsumerConfig FromStreamConfig(string stream) private async Task InitConsumer(string stream) { + var index = _streamInfos.Keys.Select((item, index) => new {Item = item, Index = index}) + .First(i => i.Item == stream).Index; + var c = await RawConsumer.Create( - _clientParameters with { ClientProvidedName = _clientParameters.ClientProvidedName }, + _clientParameters with {ClientProvidedName = $"{_clientParameters.ClientProvidedName}_{index}"}, FromStreamConfig(stream), _streamInfos[stream], _logger).ConfigureAwait(false); _logger?.LogDebug("Super stream consumer {ConsumerReference} created for Stream {StreamIdentifier}", c.Info, stream); diff --git a/Tests/SuperStreamConsumerTests.cs b/Tests/SuperStreamConsumerTests.cs index 17cb3f04..b4437a22 100644 --- a/Tests/SuperStreamConsumerTests.cs +++ b/Tests/SuperStreamConsumerTests.cs @@ -32,19 +32,26 @@ public async void NumberOfConnectionsShouldBeEqualsToThePartitions() { SystemUtils.ResetSuperStreams(); var system = await StreamSystem.Create(new StreamSystemConfig()); - var connectionName = Guid.NewGuid().ToString(); + var clientProvidedName = Guid.NewGuid().ToString(); var consumer = await system.CreateSuperStreamConsumer( new RawSuperStreamConsumerConfig(SystemUtils.InvoicesExchange) { - ClientProvidedName = connectionName, + ClientProvidedName = clientProvidedName, OffsetSpec = await SystemUtils.OffsetsForSuperStreamConsumer(system, "invoices", new OffsetTypeFirst()) }); Assert.NotNull(consumer); SystemUtils.Wait(); - SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName(connectionName).Result == 3); + + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); Assert.Equal(ResponseCode.Ok, await consumer.Close()); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 0); + await system.Close(); } @@ -113,14 +120,22 @@ public async void RemoveOneConnectionIfaStreamIsDeleted() Assert.NotNull(consumer); SystemUtils.Wait(); - SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName(clientProvidedName).Result == 3); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 1); SystemUtils.HttpDeleteQueue(SystemUtils.InvoicesStream0); - SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName(clientProvidedName).Result == 2); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); SystemUtils.HttpDeleteQueue(SystemUtils.InvoicesStream1); - SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName(clientProvidedName).Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); await consumer.Close(); // in this case we don't have any connection anymore since the super stream consumer is closed - SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName(clientProvidedName).Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); Assert.Equal(ResponseCode.Ok, await consumer.Close()); await system.Close(); } @@ -382,7 +397,9 @@ Func> consumerUpdateListener SystemUtils.Wait(TimeSpan.FromSeconds(2)); // we kill the connections of the first super stream consumer ( so 3 connections one per stream) - SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections(clientProvidedName).Result == 3); + SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections($"{clientProvidedName}_0").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections($"{clientProvidedName}_1").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections($"{clientProvidedName}_2").Result == 1); SystemUtils.Wait(TimeSpan.FromSeconds(3)); // at this point the second consumer should be active and consume the messages // and the consumerUpdateListener should be called and the offset should be restored @@ -399,7 +416,10 @@ Func> consumerUpdateListener } // just to be sure that the connections are killed - SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName(clientProvidedName).Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 0); + await consumerSingle.Close(); await system.Close(); } From 6e5b5823123367355a964c092af64864bd65b047 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 18 Jan 2024 14:39:47 +0100 Subject: [PATCH 7/9] Add lock super stream send Add the SuperStream interface Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/IConsumer.cs | 5 ++ RabbitMQ.Stream.Client/IProducer.cs | 5 +- RabbitMQ.Stream.Client/PublicAPI.Shipped.txt | 4 -- .../PublicAPI.Unshipped.txt | 8 +++ .../RawSuperStreamConsumer.cs | 8 +-- .../RawSuperStreamProducer.cs | 48 +++++++++++------ RabbitMQ.Stream.Client/StreamSystem.cs | 4 +- Tests/SuperStreamConsumerTests.cs | 53 ++++++++++++++++++- Tests/SuperStreamProducerTests.cs | 3 +- Tests/Utils.cs | 7 +-- 10 files changed, 109 insertions(+), 36 deletions(-) diff --git a/RabbitMQ.Stream.Client/IConsumer.cs b/RabbitMQ.Stream.Client/IConsumer.cs index 354c2bc8..51e72634 100644 --- a/RabbitMQ.Stream.Client/IConsumer.cs +++ b/RabbitMQ.Stream.Client/IConsumer.cs @@ -7,6 +7,11 @@ namespace RabbitMQ.Stream.Client; +public interface ISuperStreamConsumer : IConsumer +{ + public Task ReconnectPartition(StreamInfo streamInfo); +} + public interface IConsumer : IClosable { public Task StoreOffset(ulong offset); diff --git a/RabbitMQ.Stream.Client/IProducer.cs b/RabbitMQ.Stream.Client/IProducer.cs index 01bde43b..264ab370 100644 --- a/RabbitMQ.Stream.Client/IProducer.cs +++ b/RabbitMQ.Stream.Client/IProducer.cs @@ -8,6 +8,10 @@ namespace RabbitMQ.Stream.Client; +public interface ISuperStreamProducer : IProducer +{ + public Task ReconnectPartition(StreamInfo streamInfo); +} // // Producer interface for sending messages to a stream. // There are different types of producers: @@ -83,7 +87,6 @@ public record ProducerFilter public record IProducerConfig : EntityCommonConfig, INamedEntity { - public string Reference { get; set; } public int MaxInFlight { get; set; } = 1_000; public string ClientProvidedName { get; set; } = "dotnet-stream-raw-producer"; diff --git a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt index e45c7a6c..0c52b59d 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Shipped.txt @@ -720,9 +720,7 @@ RabbitMQ.Stream.Client.StreamSystem RabbitMQ.Stream.Client.StreamSystem.Close() -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.CreateRawConsumer(RabbitMQ.Stream.Client.RawConsumerConfig rawConsumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.CreateRawProducer(RabbitMQ.Stream.Client.RawProducerConfig rawProducerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task -RabbitMQ.Stream.Client.StreamSystem.CreateRawSuperStreamProducer(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.CreateStream(RabbitMQ.Stream.Client.StreamSpec spec) -> System.Threading.Tasks.Task -RabbitMQ.Stream.Client.StreamSystem.CreateSuperStreamConsumer(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.DeleteStream(string stream) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.IsClosed.get -> bool RabbitMQ.Stream.Client.StreamSystem.QueryOffset(string reference, string stream) -> System.Threading.Tasks.Task @@ -773,8 +771,6 @@ static RabbitMQ.Stream.Client.LeaderLocator.ClientLocal.get -> RabbitMQ.Stream.C static RabbitMQ.Stream.Client.LeaderLocator.LeastLeaders.get -> RabbitMQ.Stream.Client.LeaderLocator static RabbitMQ.Stream.Client.LeaderLocator.Random.get -> RabbitMQ.Stream.Client.LeaderLocator static RabbitMQ.Stream.Client.Message.From(ref System.Buffers.SequenceReader reader, uint len) -> RabbitMQ.Stream.Client.Message -static RabbitMQ.Stream.Client.RawSuperStreamConsumer.Create(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, System.Collections.Generic.IDictionary streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.IConsumer -static RabbitMQ.Stream.Client.RawSuperStreamProducer.Create(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, System.Collections.Generic.IDictionary streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.IProducer static RabbitMQ.Stream.Client.Reliable.Consumer.Create(RabbitMQ.Stream.Client.Reliable.ConsumerConfig consumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task static RabbitMQ.Stream.Client.Reliable.Producer.Create(RabbitMQ.Stream.Client.Reliable.ProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task static RabbitMQ.Stream.Client.StreamCompressionCodecs.GetCompressionCodec(RabbitMQ.Stream.Client.CompressionType compressionType) -> RabbitMQ.Stream.Client.ICompressionCodec diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index 09f0461f..9fa27676 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -141,6 +141,10 @@ RabbitMQ.Stream.Client.IProducerConfig.Filter.set -> void RabbitMQ.Stream.Client.IRouting RabbitMQ.Stream.Client.IRouting.CreateClient(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.Broker metaInfoBroker, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.IRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List partitions) -> System.Threading.Tasks.Task> +RabbitMQ.Stream.Client.ISuperStreamConsumer +RabbitMQ.Stream.Client.ISuperStreamConsumer.ReconnectPartition(RabbitMQ.Stream.Client.StreamInfo streamInfo) -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.ISuperStreamProducer +RabbitMQ.Stream.Client.ISuperStreamProducer.ReconnectPartition(RabbitMQ.Stream.Client.StreamInfo streamInfo) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.KeyRoutingStrategy RabbitMQ.Stream.Client.KeyRoutingStrategy.KeyRoutingStrategy(System.Func routingKeyExtractor, System.Func> routingKeyQFunc, string superStream) -> void RabbitMQ.Stream.Client.KeyRoutingStrategy.Route(RabbitMQ.Stream.Client.Message message, System.Collections.Generic.List partitions) -> System.Threading.Tasks.Task> @@ -239,6 +243,8 @@ RabbitMQ.Stream.Client.StreamStatsResponse.Statistic.get -> System.Collections.G RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse() -> void RabbitMQ.Stream.Client.StreamStatsResponse.StreamStatsResponse(uint correlationId, RabbitMQ.Stream.Client.ResponseCode responseCode, System.Collections.Generic.IDictionary statistic) -> void RabbitMQ.Stream.Client.StreamStatsResponse.Write(System.Span span) -> int +RabbitMQ.Stream.Client.StreamSystem.CreateRawSuperStreamProducer(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task +RabbitMQ.Stream.Client.StreamSystem.CreateSuperStreamConsumer(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.StreamInfo(string streamName) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystem.StreamStats(string stream) -> System.Threading.Tasks.Task RabbitMQ.Stream.Client.StreamSystemConfig.AuthMechanism.get -> RabbitMQ.Stream.Client.AuthMechanism @@ -255,6 +261,8 @@ static RabbitMQ.Stream.Client.Connection.Create(System.Net.EndPoint endpoint, Sy static RabbitMQ.Stream.Client.Message.From(ref System.Buffers.ReadOnlySequence seq, uint len) -> RabbitMQ.Stream.Client.Message static RabbitMQ.Stream.Client.RawConsumer.Create(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.RawConsumerConfig config, RabbitMQ.Stream.Client.StreamInfo metaStreamInfo, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task static RabbitMQ.Stream.Client.RawProducer.Create(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.RawProducerConfig config, RabbitMQ.Stream.Client.StreamInfo metaStreamInfo, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task +static RabbitMQ.Stream.Client.RawSuperStreamConsumer.Create(RabbitMQ.Stream.Client.RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, System.Collections.Generic.IDictionary streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.ISuperStreamConsumer +static RabbitMQ.Stream.Client.RawSuperStreamProducer.Create(RabbitMQ.Stream.Client.RawSuperStreamProducerConfig rawSuperStreamProducerConfig, System.Collections.Generic.IDictionary streamInfos, RabbitMQ.Stream.Client.ClientParameters clientParameters, Microsoft.Extensions.Logging.ILogger logger = null) -> RabbitMQ.Stream.Client.ISuperStreamProducer static RabbitMQ.Stream.Client.Reliable.DeduplicatingProducer.Create(RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig producerConfig, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task static RabbitMQ.Stream.Client.RoutingHelper.LookupLeaderConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task static RabbitMQ.Stream.Client.RoutingHelper.LookupRandomConnection(RabbitMQ.Stream.Client.ClientParameters clientParameters, RabbitMQ.Stream.Client.StreamInfo metaDataInfo, RabbitMQ.Stream.Client.ConnectionsPool pool, Microsoft.Extensions.Logging.ILogger logger = null) -> System.Threading.Tasks.Task diff --git a/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs b/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs index 8a338e9a..f4f2b9ad 100644 --- a/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs +++ b/RabbitMQ.Stream.Client/RawSuperStreamConsumer.cs @@ -13,7 +13,7 @@ namespace RabbitMQ.Stream.Client; -public class RawSuperStreamConsumer : IConsumer, IDisposable +public class RawSuperStreamConsumer : ISuperStreamConsumer, IDisposable { // ConcurrentDictionary because the consumer can be closed from another thread // The send operations will check if the producer exists and if not it will be created @@ -36,7 +36,7 @@ public class RawSuperStreamConsumer : IConsumer, IDisposable /// /// /// - public static IConsumer Create( + public static ISuperStreamConsumer Create( RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, IDictionary streamInfos, ClientParameters clientParameters, @@ -129,11 +129,11 @@ private RawConsumerConfig FromStreamConfig(string stream) private async Task InitConsumer(string stream) { - var index = _streamInfos.Keys.Select((item, index) => new {Item = item, Index = index}) + var index = _streamInfos.Keys.Select((item, index) => new { Item = item, Index = index }) .First(i => i.Item == stream).Index; var c = await RawConsumer.Create( - _clientParameters with {ClientProvidedName = $"{_clientParameters.ClientProvidedName}_{index}"}, + _clientParameters with { ClientProvidedName = $"{_clientParameters.ClientProvidedName}_{index}" }, FromStreamConfig(stream), _streamInfos[stream], _logger).ConfigureAwait(false); _logger?.LogDebug("Super stream consumer {ConsumerReference} created for Stream {StreamIdentifier}", c.Info, stream); diff --git a/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs b/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs index 7b3da5af..777a6f7d 100644 --- a/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs +++ b/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs @@ -23,7 +23,7 @@ namespace RabbitMQ.Stream.Client; /// When a message is sent to a stream, the producer will be selected based on the stream name and the partition key. /// SuperStreamProducer uses lazy initialization for the producers, when it starts there are no producers until the first message is sent. /// -public class RawSuperStreamProducer : IProducer, IDisposable +public class RawSuperStreamProducer : ISuperStreamProducer, IDisposable { private bool _disposed; @@ -46,7 +46,7 @@ public class RawSuperStreamProducer : IProducer, IDisposable private readonly ILogger _logger; private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1); - public static IProducer Create( + public static ISuperStreamProducer Create( RawSuperStreamProducerConfig rawSuperStreamProducerConfig, IDictionary streamInfos, ClientParameters clientParameters, @@ -129,10 +129,10 @@ private RawProducerConfig FromStreamConfig(string stream) // The producer is created on demand when a message is sent to a stream private async Task InitProducer(string stream) { - var index = _streamInfos.Keys.Select((item, index) => new { Item = item, Index = index }) + var index = _streamInfos.Keys.Select((item, index) => new {Item = item, Index = index}) .First(i => i.Item == stream).Index; var p = await RawProducer.Create( - _clientParameters with { ClientProvidedName = $"{_config.ClientProvidedName}_{index}" }, + _clientParameters with {ClientProvidedName = $"{_config.ClientProvidedName}_{index}"}, FromStreamConfig(stream), _streamInfos[stream], _logger) @@ -190,7 +190,7 @@ private async Task GetProducerForMessage(Message message) // we should always have a route // but in case of stream KEY the routing could not exist - if (routes is not { Count: > 0 }) + if (routes is not {Count: > 0}) { throw new RouteNotFoundException("No route found for the message to any stream"); } @@ -239,13 +239,21 @@ public async ValueTask Send(List<(ulong, Message)> messages) } else { - aggregate.Add((p, new List<(ulong, Message)>() { (subMessage.Item1, subMessage.Item2) })); + aggregate.Add((p, new List<(ulong, Message)>() {(subMessage.Item1, subMessage.Item2)})); } } - foreach (var (producer, list) in aggregate) + await _semaphoreSlim.WaitAsync().ConfigureAwait(false); + try + { + foreach (var (producer, list) in aggregate) + { + await producer.Send(list).ConfigureAwait(false); + } + } + finally { - await producer.Send(list).ConfigureAwait(false); + _semaphoreSlim.Release(); } } @@ -266,15 +274,23 @@ public async ValueTask Send(ulong publishingId, List subEntryMessages, } else { - aggregate.Add((p, new List() { subMessage })); + aggregate.Add((p, new List() {subMessage})); } } - // Here we send the messages to the right producer - // sub aggregate is a list of messages that have to be sent to the same producer - foreach (var (producer, messages) in aggregate) + await _semaphoreSlim.WaitAsync().ConfigureAwait(false); + try + { + // Here we send the messages to the right producer + // sub aggregate is a list of messages that have to be sent to the same producer + foreach (var (producer, messages) in aggregate) + { + await producer.Send(publishingId, messages, compressionType).ConfigureAwait(false); + } + } + finally { - await producer.Send(publishingId, messages, compressionType).ConfigureAwait(false); + _semaphoreSlim.Release(); } } @@ -403,7 +419,7 @@ public Task> Route(Message message, List partitions) var key = _routingKeyExtractor(message); var hash = new Murmur32ManagedX86(Seed).ComputeHash(Encoding.UTF8.GetBytes(key)); var index = BitConverter.ToUInt32(hash, 0) % (uint)partitions.Count; - var r = new List() { partitions[(int)index] }; + var r = new List() {partitions[(int)index]}; return Task.FromResult(r); } @@ -436,8 +452,8 @@ public async Task> Route(Message message, List partitions) var c = await _routingKeyQFunc(_superStream, key).ConfigureAwait(false); _cacheStream[key] = c.Streams; return (from resultStream in c.Streams - where partitions.Contains(resultStream) - select new List() { resultStream }).FirstOrDefault(); + where partitions.Contains(resultStream) + select new List() {resultStream}).FirstOrDefault(); } public KeyRoutingStrategy(Func routingKeyExtractor, diff --git a/RabbitMQ.Stream.Client/StreamSystem.cs b/RabbitMQ.Stream.Client/StreamSystem.cs index 4e3763e8..9a7ee8ec 100644 --- a/RabbitMQ.Stream.Client/StreamSystem.cs +++ b/RabbitMQ.Stream.Client/StreamSystem.cs @@ -162,7 +162,7 @@ private static void CheckLeader(StreamInfo metaStreamInfo) } } - public async Task CreateRawSuperStreamProducer( + public async Task CreateRawSuperStreamProducer( RawSuperStreamProducerConfig rawSuperStreamProducerConfig, ILogger logger = null) { await MayBeReconnectLocator().ConfigureAwait(false); @@ -222,7 +222,7 @@ public async Task QueryPartition(string superStream) return partitions.Streams; } - public async Task CreateSuperStreamConsumer( + public async Task CreateSuperStreamConsumer( RawSuperStreamConsumerConfig rawSuperStreamConsumerConfig, ILogger logger = null) { diff --git a/Tests/SuperStreamConsumerTests.cs b/Tests/SuperStreamConsumerTests.cs index b4437a22..68e87e5c 100644 --- a/Tests/SuperStreamConsumerTests.cs +++ b/Tests/SuperStreamConsumerTests.cs @@ -43,7 +43,7 @@ public async void NumberOfConnectionsShouldBeEqualsToThePartitions() Assert.NotNull(consumer); SystemUtils.Wait(); - + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 1); SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); @@ -140,6 +140,55 @@ public async void RemoveOneConnectionIfaStreamIsDeleted() await system.Close(); } + [Fact] + public async void SingleConsumerReconnectInCaseOfKillingConnection() + { + SystemUtils.ResetSuperStreams(); + var system = await StreamSystem.Create(new StreamSystemConfig()); + var clientProvidedName = Guid.NewGuid().ToString(); + + var configuration = new RawSuperStreamConsumerConfig(SystemUtils.InvoicesExchange) + { + ClientProvidedName = clientProvidedName, + OffsetSpec = + await SystemUtils.OffsetsForSuperStreamConsumer(system, "invoices", new OffsetTypeFirst()) + }; + + var consumer = await system.CreateSuperStreamConsumer(configuration); + var completed = new TaskCompletionSource(); + configuration.ConnectionClosedHandler = async (reason, stream) => + { + if (reason == ConnectionClosedReason.Unexpected) + { + SystemUtils.Wait(); + await consumer.ReconnectPartition( + await system.StreamInfo(stream).ConfigureAwait(false) + ); + SystemUtils.Wait(); + completed.SetResult(true); + } + }; + + Assert.NotNull(consumer); + SystemUtils.Wait(); + + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); + + SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections($"{clientProvidedName}_0").Result == 1); + + completed.Task.Wait(); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 1); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 1); + Assert.Equal(ResponseCode.Ok, await consumer.Close()); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_0").Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 0); + SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 0); + await system.Close(); + } + [Fact] public async void ValidateSuperStreamConsumer() { @@ -225,7 +274,7 @@ public async void MoreConsumersNumberOfMessagesConsumedShouldBeEqualsToPublished const int NumberOfMessages = 20; var system = await StreamSystem.Create(new StreamSystemConfig()); var publishToSuperStreamTask = - SystemUtils.PublishMessagesSuperStream(system, "invoices", NumberOfMessages, "", _testOutputHelper); + SystemUtils.PublishMessagesSuperStream(system, SystemUtils.InvoicesExchange, NumberOfMessages, "", _testOutputHelper); if (await Task.WhenAny(publishToSuperStreamTask, Task.Delay(20000)) != publishToSuperStreamTask) { Assert.Fail("timeout waiting to publish messages"); diff --git a/Tests/SuperStreamProducerTests.cs b/Tests/SuperStreamProducerTests.cs index b337dac7..47249ef3 100644 --- a/Tests/SuperStreamProducerTests.cs +++ b/Tests/SuperStreamProducerTests.cs @@ -424,7 +424,6 @@ public async void SendMessageToSuperStreamRecreateConnectionsIfKilled() // the method ReconnectPartition is used to reconnect the partition var system = await StreamSystem.Create(new StreamSystemConfig()); var clientName = Guid.NewGuid().ToString(); - RawSuperStreamProducer streamProducer = null; var c = new RawSuperStreamProducerConfig(SystemUtils.InvoicesExchange) { @@ -432,7 +431,7 @@ public async void SendMessageToSuperStreamRecreateConnectionsIfKilled() ClientProvidedName = clientName, }; var completed = new TaskCompletionSource(); - streamProducer = (RawSuperStreamProducer)await system.CreateRawSuperStreamProducer(c); + var streamProducer = await system.CreateRawSuperStreamProducer(c); c.ConnectionClosedHandler = async (reason, stream) => { var streamInfo = await system.StreamInfo(stream); diff --git a/Tests/Utils.cs b/Tests/Utils.cs index de8d2d24..49f2194d 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -200,13 +200,10 @@ public static async Task PublishMessagesSuperStream(StreamSystem system, string Routing = message1 => message1.Properties.MessageId.ToString(), ConfirmHandler = _ => { - count++; - if (count != numberOfMessages) + if (Interlocked.Increment(ref count) == numberOfMessages) { - return; + testPassed.SetResult(count); } - - testPassed.SetResult(count); } }); From d7b19a394c11c1b8c68475df79cfa5f9b24676a0 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 18 Jan 2024 15:02:55 +0100 Subject: [PATCH 8/9] formatting Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/RawSuperStreamProducer.cs | 16 ++++++++-------- .../Reliable/ProducerFactory.cs | 9 ++++++++- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs b/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs index 777a6f7d..ddcd3a5c 100644 --- a/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs +++ b/RabbitMQ.Stream.Client/RawSuperStreamProducer.cs @@ -129,10 +129,10 @@ private RawProducerConfig FromStreamConfig(string stream) // The producer is created on demand when a message is sent to a stream private async Task InitProducer(string stream) { - var index = _streamInfos.Keys.Select((item, index) => new {Item = item, Index = index}) + var index = _streamInfos.Keys.Select((item, index) => new { Item = item, Index = index }) .First(i => i.Item == stream).Index; var p = await RawProducer.Create( - _clientParameters with {ClientProvidedName = $"{_config.ClientProvidedName}_{index}"}, + _clientParameters with { ClientProvidedName = $"{_config.ClientProvidedName}_{index}" }, FromStreamConfig(stream), _streamInfos[stream], _logger) @@ -190,7 +190,7 @@ private async Task GetProducerForMessage(Message message) // we should always have a route // but in case of stream KEY the routing could not exist - if (routes is not {Count: > 0}) + if (routes is not { Count: > 0 }) { throw new RouteNotFoundException("No route found for the message to any stream"); } @@ -239,7 +239,7 @@ public async ValueTask Send(List<(ulong, Message)> messages) } else { - aggregate.Add((p, new List<(ulong, Message)>() {(subMessage.Item1, subMessage.Item2)})); + aggregate.Add((p, new List<(ulong, Message)>() { (subMessage.Item1, subMessage.Item2) })); } } @@ -274,7 +274,7 @@ public async ValueTask Send(ulong publishingId, List subEntryMessages, } else { - aggregate.Add((p, new List() {subMessage})); + aggregate.Add((p, new List() { subMessage })); } } @@ -419,7 +419,7 @@ public Task> Route(Message message, List partitions) var key = _routingKeyExtractor(message); var hash = new Murmur32ManagedX86(Seed).ComputeHash(Encoding.UTF8.GetBytes(key)); var index = BitConverter.ToUInt32(hash, 0) % (uint)partitions.Count; - var r = new List() {partitions[(int)index]}; + var r = new List() { partitions[(int)index] }; return Task.FromResult(r); } @@ -452,8 +452,8 @@ public async Task> Route(Message message, List partitions) var c = await _routingKeyQFunc(_superStream, key).ConfigureAwait(false); _cacheStream[key] = c.Streams; return (from resultStream in c.Streams - where partitions.Contains(resultStream) - select new List() {resultStream}).FirstOrDefault(); + where partitions.Contains(resultStream) + select new List() { resultStream }).FirstOrDefault(); } public KeyRoutingStrategy(Func routingKeyExtractor, diff --git a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs index afd4027f..2671c467 100644 --- a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs @@ -1,6 +1,9 @@ // This source code is dual-licensed under the Apache License, version // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. + +using System; +using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -18,6 +21,7 @@ public abstract class ProducerFactory : ReliableBase protected IProducer _producer; protected ProducerConfig _producerConfig; protected ConfirmationPipe _confirmationPipe; + private static readonly ManualResetEvent s_reconnect = new ManualResetEvent(false); protected async Task CreateProducer(bool boot) { @@ -46,6 +50,8 @@ private async Task SuperStreamProducer(bool boot) Filter = _producerConfig.Filter, ConnectionClosedHandler = async (closeReason, partitionStream) => { + s_reconnect.WaitOne(TimeSpan.FromSeconds(1)); + if (closeReason == ConnectionClosedReason.Normal) { BaseLogger.LogDebug("{Identity} is closed normally", ToString()); @@ -55,7 +61,8 @@ private async Task SuperStreamProducer(bool boot) var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition; await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r) .ConfigureAwait(false); - + s_reconnect.Set(); + s_reconnect.Reset(); }, MetadataHandler = async update => { From 8e8dc04abe00e020bd4cfb862bee88eb6b32d460 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 18 Jan 2024 15:04:37 +0100 Subject: [PATCH 9/9] formatting Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs index 2671c467..557797dd 100644 --- a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs @@ -2,8 +2,6 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. -using System; -using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; @@ -21,7 +19,6 @@ public abstract class ProducerFactory : ReliableBase protected IProducer _producer; protected ProducerConfig _producerConfig; protected ConfirmationPipe _confirmationPipe; - private static readonly ManualResetEvent s_reconnect = new ManualResetEvent(false); protected async Task CreateProducer(bool boot) { @@ -50,8 +47,6 @@ private async Task SuperStreamProducer(bool boot) Filter = _producerConfig.Filter, ConnectionClosedHandler = async (closeReason, partitionStream) => { - s_reconnect.WaitOne(TimeSpan.FromSeconds(1)); - if (closeReason == ConnectionClosedReason.Normal) { BaseLogger.LogDebug("{Identity} is closed normally", ToString()); @@ -61,8 +56,6 @@ private async Task SuperStreamProducer(bool boot) var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition; await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r) .ConfigureAwait(false); - s_reconnect.Set(); - s_reconnect.Reset(); }, MetadataHandler = async update => {