diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt index 588e3875..835207fe 100644 --- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt +++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt @@ -88,6 +88,8 @@ RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig RabbitMQ.Stream.Client.Reliable.DeduplicatingProducerConfig.DeduplicatingProducerConfig(RabbitMQ.Stream.Client.StreamSystem streamSystem, string stream, string reference) -> void RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.get -> RabbitMQ.Stream.Client.ProducerFilter RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void +RabbitMQ.Stream.Client.Reliable.ProducerConfig.OnConnectionClosed.get -> System.Func +RabbitMQ.Stream.Client.Reliable.ProducerConfig.OnConnectionClosed.set -> void RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.set -> void diff --git a/RabbitMQ.Stream.Client/Reliable/Producer.cs b/RabbitMQ.Stream.Client/Reliable/Producer.cs index 7b3cff24..dc9dc285 100644 --- a/RabbitMQ.Stream.Client/Reliable/Producer.cs +++ b/RabbitMQ.Stream.Client/Reliable/Producer.cs @@ -104,6 +104,8 @@ public TimeSpan TimeoutMessageAfter } } + public Func OnConnectionClosed { get; set; } = null; + public ProducerConfig(StreamSystem streamSystem, string stream) : base(streamSystem, stream) { } diff --git a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs index 48e82cef..04c0c97f 100644 --- a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs +++ b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs @@ -62,6 +62,11 @@ private async Task SuperStreamProducer() private async Task StandardProducer() { + var onConnectionClosed = _producerConfig.OnConnectionClosed ?? (async _ => + { + await TryToReconnect(_producerConfig.ReconnectStrategy).ConfigureAwait(false); + }); + return await _producerConfig.StreamSystem.CreateRawProducer(new RawProducerConfig(_producerConfig.Stream) { ClientProvidedName = _producerConfig.ClientProvidedName, @@ -80,10 +85,7 @@ private async Task StandardProducer() _producerConfig.StreamSystem).WaitAsync(CancellationToken.None); }); }, - ConnectionClosedHandler = async _ => - { - await TryToReconnect(_producerConfig.ReconnectStrategy).ConfigureAwait(false); - }, + ConnectionClosedHandler = onConnectionClosed, ConfirmHandler = confirmation => { var confirmationStatus = confirmation.Code switch