diff --git a/Directory.Packages.props b/Directory.Packages.props
index 665a5df3..5279b389 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -21,18 +21,16 @@
-
-
-
-
+
+
\ No newline at end of file
diff --git a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
index 7e65f49a..7a6a3b1e 100644
--- a/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
+++ b/RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
@@ -184,6 +184,13 @@ RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.ConnectionClosedHandler.get
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.ConnectionClosedHandler.set -> void
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void
+RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
+RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.BoolFailure = 5 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
+RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.ClosedByStrategyPolicy = 4 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
+RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.ClosedByUser = 3 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
+RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.MetaDataUpdate = 2 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
+RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.None = 0 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
+RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.UnexpectedlyDisconnected = 1 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
RabbitMQ.Stream.Client.Reliable.Consumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.set -> void
@@ -209,18 +216,35 @@ RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
RabbitMQ.Stream.Client.Reliable.ProducerFactory.CreateProducer(bool boot) -> System.Threading.Tasks.Task
RabbitMQ.Stream.Client.Reliable.ProducerFactory._producer -> RabbitMQ.Stream.Client.IProducer
-RabbitMQ.Stream.Client.Reliable.ReliableBase.UpdateStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus status) -> void
+RabbitMQ.Stream.Client.Reliable.ReliableBase.UpdateStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus newStatus, RabbitMQ.Stream.Client.Reliable.ChangeStatusReason reason, string partition = null) -> void
RabbitMQ.Stream.Client.Reliable.ReliableBase._status -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Identifier.get -> string
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Identifier.set -> void
+RabbitMQ.Stream.Client.Reliable.ReliableConfig.OnStatusChanged(RabbitMQ.Stream.Client.Reliable.StatusInfo statusInfo) -> void
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ReconnectStrategy.get -> RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ResourceAvailableReconnectStrategy.get -> RabbitMQ.Stream.Client.Reliable.IReconnectStrategy
RabbitMQ.Stream.Client.Reliable.ReliableConfig.ResourceAvailableReconnectStrategy.set -> void
+RabbitMQ.Stream.Client.Reliable.ReliableConfig.StatusChanged -> RabbitMQ.Stream.Client.Reliable.ReliableConfig.StatusChangedHandler
+RabbitMQ.Stream.Client.Reliable.ReliableConfig.StatusChangedHandler
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Closed = 3 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Initialization = 0 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Open = 1 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
-RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Reconnecting = 2 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
+RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Reconnection = 2 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
+RabbitMQ.Stream.Client.Reliable.StatusInfo
+RabbitMQ.Stream.Client.Reliable.StatusInfo.From.get -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
+RabbitMQ.Stream.Client.Reliable.StatusInfo.From.init -> void
+RabbitMQ.Stream.Client.Reliable.StatusInfo.Identifier.get -> string
+RabbitMQ.Stream.Client.Reliable.StatusInfo.Identifier.init -> void
+RabbitMQ.Stream.Client.Reliable.StatusInfo.Partition.get -> string
+RabbitMQ.Stream.Client.Reliable.StatusInfo.Partition.init -> void
+RabbitMQ.Stream.Client.Reliable.StatusInfo.Reason.get -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
+RabbitMQ.Stream.Client.Reliable.StatusInfo.Reason.init -> void
+RabbitMQ.Stream.Client.Reliable.StatusInfo.StatusInfo(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus From, RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus To, string Stream, string Identifier, string Partition, RabbitMQ.Stream.Client.Reliable.ChangeStatusReason Reason = RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.None) -> void
+RabbitMQ.Stream.Client.Reliable.StatusInfo.Stream.get -> string
+RabbitMQ.Stream.Client.Reliable.StatusInfo.Stream.init -> void
+RabbitMQ.Stream.Client.Reliable.StatusInfo.To.get -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
+RabbitMQ.Stream.Client.Reliable.StatusInfo.To.init -> void
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.Reliable.SuperStreamConfig.RoutingStrategyType.set -> void
RabbitMQ.Stream.Client.RouteNotFoundException
diff --git a/RabbitMQ.Stream.Client/Reliable/Consumer.cs b/RabbitMQ.Stream.Client/Reliable/Consumer.cs
index eb958fd1..f479dcfb 100644
--- a/RabbitMQ.Stream.Client/Reliable/Consumer.cs
+++ b/RabbitMQ.Stream.Client/Reliable/Consumer.cs
@@ -172,7 +172,7 @@ public static async Task Create(ConsumerConfig consumerConfig, ILogger
consumerConfig.ReconnectStrategy ??= new BackOffReconnectStrategy(logger);
consumerConfig.ResourceAvailableReconnectStrategy ??= new ResourceAvailableBackOffReconnectStrategy(logger);
var rConsumer = new Consumer(consumerConfig, logger);
- await rConsumer.Init(consumerConfig.ReconnectStrategy, consumerConfig.ResourceAvailableReconnectStrategy)
+ await rConsumer.Init(consumerConfig)
.ConfigureAwait(false);
return rConsumer;
}
@@ -204,11 +204,11 @@ public override async Task Close()
{
if (_status == ReliableEntityStatus.Initialization)
{
- UpdateStatus(ReliableEntityStatus.Closed);
+ UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser);
return;
}
- UpdateStatus(ReliableEntityStatus.Closed);
+ UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser);
await CloseEntity().ConfigureAwait(false);
_logger?.LogDebug("Consumer {Identity} closed", ToString());
}
diff --git a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs
index b53d136f..5188cf64 100644
--- a/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs
+++ b/RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs
@@ -62,15 +62,18 @@ private async Task StandardConsumer(bool boot)
{
if (closeReason == ConnectionClosedReason.Normal)
{
+ // we don't update the status here since it happens when Close() is called in a normal way
BaseLogger.LogInformation("{Identity} is closed normally", ToString());
return;
}
- await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream).ConfigureAwait(false);
+ await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
+ ChangeStatusReason.UnexpectedlyDisconnected).ConfigureAwait(false);
},
MetadataHandler = async _ =>
{
- await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream).ConfigureAwait(false);
+ await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
+ ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false);
},
MessageHandler = async (consumer, ctx, message) =>
{
@@ -127,19 +130,22 @@ private async Task SuperConsumer(bool boot)
await RandomWait().ConfigureAwait(false);
if (closeReason == ConnectionClosedReason.Normal)
{
+ // we don't update the status here since it happens when Close() is called in a normal way
BaseLogger.LogInformation("{Identity} is closed normally", ToString());
return;
}
var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
- await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r)
+ await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r,
+ ChangeStatusReason.UnexpectedlyDisconnected)
.ConfigureAwait(false);
},
MetadataHandler = async update =>
{
await RandomWait().ConfigureAwait(false);
var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
- await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r)
+ await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r,
+ ChangeStatusReason.MetaDataUpdate)
.ConfigureAwait(false);
},
MessageHandler = async (partitionStream, consumer, ctx, message) =>
diff --git a/RabbitMQ.Stream.Client/Reliable/Producer.cs b/RabbitMQ.Stream.Client/Reliable/Producer.cs
index e9d612a8..ac0c14a8 100644
--- a/RabbitMQ.Stream.Client/Reliable/Producer.cs
+++ b/RabbitMQ.Stream.Client/Reliable/Producer.cs
@@ -159,7 +159,7 @@ public static async Task Create(ProducerConfig producerConfig, ILogger
producerConfig.ReconnectStrategy ??= new BackOffReconnectStrategy(logger);
producerConfig.ResourceAvailableReconnectStrategy ??= new ResourceAvailableBackOffReconnectStrategy(logger);
var rProducer = new Producer(producerConfig, logger);
- await rProducer.Init(producerConfig.ReconnectStrategy, producerConfig.ResourceAvailableReconnectStrategy)
+ await rProducer.Init(producerConfig)
.ConfigureAwait(false);
return rProducer;
}
@@ -201,11 +201,11 @@ public override async Task Close()
{
if (ReliableEntityStatus.Initialization == _status)
{
- UpdateStatus(ReliableEntityStatus.Closed);
+ UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser);
return;
}
- UpdateStatus(ReliableEntityStatus.Closed);
+ UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser);
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
diff --git a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs
index d3c980cb..a64065d4 100644
--- a/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs
+++ b/RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs
@@ -34,7 +34,6 @@ private async Task SuperStreamProducer(bool boot)
{
if (boot)
{
-
return await _producerConfig.StreamSystem.CreateRawSuperStreamProducer(
new RawSuperStreamProducerConfig(_producerConfig.Stream)
{
@@ -56,14 +55,16 @@ private async Task SuperStreamProducer(bool boot)
}
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
- await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r)
+ await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r,
+ ChangeStatusReason.UnexpectedlyDisconnected)
.ConfigureAwait(false);
},
MetadataHandler = async update =>
{
await RandomWait().ConfigureAwait(false);
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
- await OnEntityClosed(_producerConfig.StreamSystem, update.Stream, r)
+ await OnEntityClosed(_producerConfig.StreamSystem, update.Stream, r,
+ ChangeStatusReason.MetaDataUpdate)
.ConfigureAwait(false);
},
ConfirmHandler = confirmationHandler =>
@@ -104,7 +105,8 @@ private async Task StandardProducer()
MetadataHandler = async _ =>
{
await RandomWait().ConfigureAwait(false);
- await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream).ConfigureAwait(false);
+ await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream,
+ ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false);
},
ConnectionClosedHandler = async (closeReason) =>
{
@@ -115,7 +117,8 @@ private async Task StandardProducer()
return;
}
- await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream).ConfigureAwait(false);
+ await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream,
+ ChangeStatusReason.UnexpectedlyDisconnected).ConfigureAwait(false);
},
ConfirmHandler = confirmation =>
{
diff --git a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs
index 23f0be74..be9b2025 100644
--- a/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs
+++ b/RabbitMQ.Stream.Client/Reliable/ReliableBase.cs
@@ -9,6 +9,34 @@
namespace RabbitMQ.Stream.Client.Reliable;
+///
+/// StatusInfo is the information about the change status of the ReliableEntity
+///
+/// The previous entity status
+/// The new status
+/// Stream or SuperSuper affected
+/// The Entity Identifier
+/// Super stream partition. Valid only for SuperStream else is empty
+/// The reason why the status changed
+public record StatusInfo(
+ ReliableEntityStatus From, // init
+ ReliableEntityStatus To, // open
+ string Stream,
+ string Identifier,
+ string Partition,
+ ChangeStatusReason Reason = ChangeStatusReason.None
+);
+
+public enum ChangeStatusReason
+{
+ None,
+ UnexpectedlyDisconnected,
+ MetaDataUpdate,
+ ClosedByUser,
+ ClosedByStrategyPolicy,
+ BoolFailure
+}
+
public record ReliableConfig
{
///
@@ -47,6 +75,15 @@ protected ReliableConfig(StreamSystem streamSystem, string stream)
Stream = stream;
StreamSystem = streamSystem;
}
+
+ public delegate void StatusChangedHandler(StatusInfo statusInfo);
+
+ public event StatusChangedHandler StatusChanged;
+
+ protected internal void OnStatusChanged(StatusInfo statusInfo)
+ {
+ StatusChanged?.Invoke(statusInfo);
+ }
}
///
@@ -54,10 +91,10 @@ protected ReliableConfig(StreamSystem streamSystem, string stream)
///
public enum ReliableEntityStatus
{
- Initialization,// the entity is initializing
+ Initialization, // the entity is initializing
Open, // the entity is open and ready to use
- Reconnecting,// the entity is reconnecting but still can be used
- Closed,// the entity is closed and cannot be used anymore
+ Reconnection, // the entity is in reconnection but it is still considered open
+ Closed, // the entity is closed and cannot be used anymore
}
///
@@ -69,17 +106,27 @@ public abstract class ReliableBase
protected readonly SemaphoreSlim SemaphoreSlim = new(1, 1);
private readonly object _lock = new();
protected ReliableEntityStatus _status = ReliableEntityStatus.Initialization;
+ protected abstract ILogger BaseLogger { get; }
+ private ReliableConfig _reliableConfig;
protected static async Task RandomWait()
{
await Task.Delay(Consts.RandomMid()).ConfigureAwait(false);
}
- protected void UpdateStatus(ReliableEntityStatus status)
+ protected void UpdateStatus(ReliableEntityStatus newStatus,
+ ChangeStatusReason reason, string partition = null)
{
+ var oldStatus = _status;
lock (_lock)
{
- _status = status;
+ _status = newStatus;
+ if (oldStatus != newStatus)
+ {
+ _reliableConfig.OnStatusChanged(new StatusInfo(oldStatus, newStatus,
+ _reliableConfig.Stream,
+ _reliableConfig.Identifier, partition, reason));
+ }
}
}
@@ -95,20 +142,13 @@ private bool IsValidStatus()
{
lock (_lock)
{
- return _status is ReliableEntityStatus.Open or ReliableEntityStatus.Reconnecting
- or ReliableEntityStatus.Initialization;
+ return _status is not ReliableEntityStatus.Closed;
}
}
- protected abstract ILogger BaseLogger { get; }
- private IReconnectStrategy _reconnectStrategy;
- private IReconnectStrategy _resourceAvailableReconnectStrategy;
-
- internal async Task Init(IReconnectStrategy reconnectStrategy,
- IReconnectStrategy resourceAvailableReconnectStrategy)
+ internal async Task Init(ReliableConfig reliableConfig)
{
- _reconnectStrategy = reconnectStrategy;
- _resourceAvailableReconnectStrategy = resourceAvailableReconnectStrategy;
+ _reliableConfig = reliableConfig;
await Init(true).ConfigureAwait(false);
}
@@ -122,7 +162,7 @@ private async Task MaybeInit(bool boot)
// else there are two ways:
// - the exception is a known exception and the client will try to reconnect
// - the exception is not a known exception and the client will throw the exception
- UpdateStatus(ReliableEntityStatus.Open);
+ UpdateStatus(ReliableEntityStatus.Open, ChangeStatusReason.None);
}
catch (Exception e)
{
@@ -131,7 +171,7 @@ private async Task MaybeInit(bool boot)
BaseLogger.LogError("{Identity} Error during the first boot {EMessage}",
ToString(), e.Message);
// if it is the first boot we don't need to reconnect
- UpdateStatus(ReliableEntityStatus.Closed);
+ UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.BoolFailure);
throw;
}
@@ -140,7 +180,9 @@ private async Task MaybeInit(bool boot)
}
if (reconnect)
+ {
await MaybeReconnect().ConfigureAwait(false);
+ }
}
//
@@ -158,7 +200,7 @@ private async Task Init(bool boot)
// each time that the client is initialized, we need to reset the status
// if we hare here it means that the entity is not open for some reason like:
// first time initialization or reconnect due of a IsAKnownException
- UpdateStatus(ReliableEntityStatus.Initialization);
+ UpdateStatus(ReliableEntityStatus.Initialization, ChangeStatusReason.None);
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
try
@@ -207,13 +249,14 @@ private async Task CheckIfStreamIsAvailable(string stream, StreamSystem sy
available += " and has a valid leader";
}
- await _resourceAvailableReconnectStrategy.WhenConnected($"{stream} for {ToString()} is {available}")
+ await _reliableConfig.ResourceAvailableReconnectStrategy
+ .WhenConnected($"{stream} for {ToString()} is {available}")
.ConfigureAwait(false);
break;
}
catch (Exception e)
{
- tryAgain = await _resourceAvailableReconnectStrategy
+ tryAgain = await _reliableConfig.ResourceAvailableReconnectStrategy
.WhenDisconnected($"Stream {stream} for {ToString()}. Error: {e.Message} ").ConfigureAwait(false);
}
}
@@ -240,22 +283,21 @@ private async Task CheckIfStreamIsAvailable(string stream, StreamSystem sy
//
private async Task MaybeReconnect()
{
- var reconnect = await _reconnectStrategy.WhenDisconnected(ToString()).ConfigureAwait(false);
+ var reconnect = await _reliableConfig.ReconnectStrategy.WhenDisconnected(ToString()).ConfigureAwait(false);
if (!reconnect)
{
BaseLogger.LogDebug("{Identity} is closed due of reconnect strategy", ToString());
- UpdateStatus(ReliableEntityStatus.Closed);
+ UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByStrategyPolicy);
return;
}
switch (IsOpen())
{
case true:
- UpdateStatus(ReliableEntityStatus.Reconnecting);
await MaybeInit(false).ConfigureAwait(false);
break;
case false:
- if (CompareStatus(ReliableEntityStatus.Reconnecting))
+ if (CompareStatus(ReliableEntityStatus.Reconnection))
{
BaseLogger.LogDebug("{Identity} is in Reconnecting", ToString());
}
@@ -267,22 +309,21 @@ private async Task MaybeReconnect()
private async Task MaybeReconnectPartition(StreamInfo streamInfo, string info,
Func reconnectPartitionFunc)
{
- var reconnect = await _reconnectStrategy
+ var reconnect = await _reliableConfig.ReconnectStrategy
.WhenDisconnected($"Super Stream partition: {streamInfo.Stream} for {info}").ConfigureAwait(false);
if (!reconnect)
{
BaseLogger.LogDebug("{Identity} partition is closed due of reconnect strategy", ToString());
- UpdateStatus(ReliableEntityStatus.Closed);
+ UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByStrategyPolicy, streamInfo.Stream);
return;
}
try
{
- UpdateStatus(ReliableEntityStatus.Reconnecting);
await reconnectPartitionFunc(streamInfo).ConfigureAwait(false);
- UpdateStatus(ReliableEntityStatus.Open);
- await _reconnectStrategy.WhenConnected(
+ UpdateStatus(ReliableEntityStatus.Open, ChangeStatusReason.None, streamInfo.Stream);
+ await _reliableConfig.ReconnectStrategy.WhenConnected(
$"Super Stream partition: {streamInfo.Stream} for {info}").ConfigureAwait(false);
}
catch (Exception e)
@@ -321,11 +362,14 @@ private void LogException(Exception exception)
/// Stream System
/// Partition Stream
/// Function to reconnect the partition
+ /// The reason why the connection is closed (Metadata update od disconnection)
internal async Task OnEntityClosed(StreamSystem system, string stream,
- Func reconnectPartitionFunc)
+ Func reconnectPartitionFunc, ChangeStatusReason reason)
{
var streamExists = false;
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
+ UpdateStatus(ReliableEntityStatus.Reconnection, reason,
+ stream);
try
{
streamExists = await CheckIfStreamIsAvailable(stream, system)
@@ -352,10 +396,12 @@ internal async Task OnEntityClosed(StreamSystem system, string stream,
///
/// Stream system
/// Stream
- internal async Task OnEntityClosed(StreamSystem system, string stream)
+ /// The reason why the connection is closed (Metadata update od disconnection)
+ internal async Task OnEntityClosed(StreamSystem system, string stream, ChangeStatusReason reason)
{
var streamExists = false;
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
+ UpdateStatus(ReliableEntityStatus.Reconnection, reason, stream);
try
{
streamExists = await CheckIfStreamIsAvailable(stream, system)
@@ -385,8 +431,7 @@ public bool IsOpen()
{
lock (_lock)
{
- return _status is ReliableEntityStatus.Open or ReliableEntityStatus.Reconnecting
- or ReliableEntityStatus.Initialization;
+ return _status is not ReliableEntityStatus.Closed;
}
}
}
diff --git a/RabbitMQ.Stream.Client/RoutingClient.cs b/RabbitMQ.Stream.Client/RoutingClient.cs
index 9a22a717..11c71afa 100644
--- a/RabbitMQ.Stream.Client/RoutingClient.cs
+++ b/RabbitMQ.Stream.Client/RoutingClient.cs
@@ -67,7 +67,11 @@ internal static async Task LookupConnection(
// In this case we just return the node (leader for producer, random for consumer)
// since there is not load balancer configuration
- return await routing.CreateClient(clientParameters with { Endpoint = endPointNoLb }, broker, logger)
+ return await routing.CreateClient(clientParameters with
+ {
+ Endpoint = endPointNoLb,
+ ClientProvidedName = clientParameters.ClientProvidedName
+ }, broker, logger)
.ConfigureAwait(false);
}
@@ -75,7 +79,11 @@ internal static async Task LookupConnection(
// so there is a load-balancer or proxy we need to get the right connection
// as first we try with the first node given from the LB
var endPoint = clientParameters.AddressResolver.EndPoint;
- var client = await routing.CreateClient(clientParameters with { Endpoint = endPoint }, broker, logger)
+ var client = await routing.CreateClient(clientParameters with
+ {
+ Endpoint = endPoint,
+ ClientProvidedName = clientParameters.ClientProvidedName
+ }, broker, logger)
.ConfigureAwait(false);
var advertisedHost = GetPropertyValue(client.ConnectionProperties, "advertised_host");
@@ -87,7 +95,11 @@ internal static async Task LookupConnection(
attemptNo++;
await client.Close("advertised_host or advertised_port doesn't match").ConfigureAwait(false);
- client = await routing.CreateClient(clientParameters with { Endpoint = endPoint }, broker, logger)
+ client = await routing.CreateClient(clientParameters with
+ {
+ Endpoint = endPoint,
+ ClientProvidedName = clientParameters.ClientProvidedName
+ }, broker, logger)
.ConfigureAwait(false);
advertisedHost = GetPropertyValue(client.ConnectionProperties, "advertised_host");
diff --git a/RabbitMQ.Stream.Client/StreamSystem.cs b/RabbitMQ.Stream.Client/StreamSystem.cs
index 6a998d8b..ab04e159 100644
--- a/RabbitMQ.Stream.Client/StreamSystem.cs
+++ b/RabbitMQ.Stream.Client/StreamSystem.cs
@@ -182,20 +182,20 @@ public async Task CreateRawSuperStreamProducer(
throw new CreateProducerException($"producer could not be created code: {partitions.ResponseCode}");
}
- await _semClientProvidedName.WaitAsync().ConfigureAwait(false);
- try
+ IDictionary streamInfos = new Dictionary();
+ foreach (var partitionsStream in partitions.Streams)
{
- IDictionary streamInfos = new Dictionary();
- foreach (var partitionsStream in partitions.Streams)
- {
- streamInfos[partitionsStream] = await StreamInfo(partitionsStream).ConfigureAwait(false);
- }
+ streamInfos[partitionsStream] = await StreamInfo(partitionsStream).ConfigureAwait(false);
+ }
- foreach (var (_, value) in streamInfos)
- {
- ClientExceptions.CheckLeader(value);
- }
+ foreach (var (_, value) in streamInfos)
+ {
+ ClientExceptions.CheckLeader(value);
+ }
+ await _semClientProvidedName.WaitAsync().ConfigureAwait(false);
+ try
+ {
var r = RawSuperStreamProducer.Create(rawSuperStreamProducerConfig,
streamInfos,
_clientParameters with { ClientProvidedName = rawSuperStreamProducerConfig.ClientProvidedName },
@@ -247,15 +247,15 @@ public async Task CreateSuperStreamConsumer(
partitions.ResponseCode);
}
+ IDictionary streamInfos = new Dictionary();
+ foreach (var partitionsStream in partitions.Streams)
+ {
+ streamInfos[partitionsStream] = await StreamInfo(partitionsStream).ConfigureAwait(false);
+ }
+
await _semClientProvidedName.WaitAsync().ConfigureAwait(false);
try
{
- IDictionary streamInfos = new Dictionary();
- foreach (var partitionsStream in partitions.Streams)
- {
- streamInfos[partitionsStream] = await StreamInfo(partitionsStream).ConfigureAwait(false);
- }
-
var s = RawSuperStreamConsumer.Create(rawSuperStreamConsumerConfig,
streamInfos,
_clientParameters with { ClientProvidedName = rawSuperStreamConsumerConfig.ClientProvidedName },
@@ -287,9 +287,9 @@ public async Task CreateRawProducer(RawProducerConfig rawProducerConf
ClientExceptions.CheckLeader(metaStreamInfo);
+ await _semClientProvidedName.WaitAsync().ConfigureAwait(false);
try
{
- await _semClientProvidedName.WaitAsync().ConfigureAwait(false);
rawProducerConfig.Pool = PoolProducers;
var s = _clientParameters with { ClientProvidedName = rawProducerConfig.ClientProvidedName };
diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs
index e750633d..b3a24e32 100644
--- a/Tests/ReliableTests.cs
+++ b/Tests/ReliableTests.cs
@@ -94,26 +94,28 @@ public async void ConfirmRProducerMessages()
var testPassed = new TaskCompletionSource();
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
var count = 0;
- var producer = await Producer.Create(
- new ProducerConfig(system, stream)
+ var config = new ProducerConfig(system, stream)
+ {
+ MessagesBufferSize = 150,
+ Identifier = "my_producer_0874",
+ ConfirmationHandler = _ =>
{
- MessagesBufferSize = 150,
- Identifier = "my_producer_0874",
- ConfirmationHandler = _ =>
+ if (Interlocked.Increment(ref count) ==
+ 5 + // first five messages iteration
+ 5 + // second five messages iteration with compression enabled
+ 2 // batch send iteration since the messages list contains two messages
+ )
{
- if (Interlocked.Increment(ref count) ==
- 5 + // first five messages iteration
- 5 + // second five messages iteration with compression enabled
- 2 // batch send iteration since the messages list contains two messages
- )
- {
- testPassed.SetResult(true);
- }
-
- return Task.CompletedTask;
+ testPassed.SetResult(true);
}
+
+ return Task.CompletedTask;
}
- );
+ };
+ var statusInfoReceived = new List();
+ config.StatusChanged += (status) => { statusInfoReceived.Add(status); };
+ var producer = await Producer.Create(config);
+
for (var i = 0; i < 5; i++)
{
await producer.Send(new Message(Encoding.UTF8.GetBytes($"hello {i}")));
@@ -136,6 +138,15 @@ public async void ConfirmRProducerMessages()
new Utils(_testOutputHelper).WaitUntilTaskCompletes(testPassed);
Assert.Equal("my_producer_0874", producer.Info.Identifier);
await producer.Close();
+ Assert.Equal(ReliableEntityStatus.Initialization, statusInfoReceived[0].From);
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[0].To);
+
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[1].From);
+ Assert.Equal(ReliableEntityStatus.Closed, statusInfoReceived[1].To);
+ Assert.Equal(stream, statusInfoReceived[1].Stream);
+ Assert.Equal(stream, statusInfoReceived[0].Stream);
+ Assert.Equal("my_producer_0874", statusInfoReceived[0].Identifier);
+ Assert.Equal("my_producer_0874", statusInfoReceived[1].Identifier);
await system.Close();
}
@@ -153,7 +164,7 @@ public async void SendMessageAfterKillConnectionShouldContinueToWork()
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream, clientProvidedNameLocator);
var count = 0;
var clientProvidedName = Guid.NewGuid().ToString();
- var producer = await Producer.Create(
+ var config =
new ProducerConfig(system, stream)
{
ClientProvidedName = clientProvidedName,
@@ -168,9 +179,10 @@ public async void SendMessageAfterKillConnectionShouldContinueToWork()
return Task.CompletedTask;
},
ReconnectStrategy = new TestBackOffReconnectStrategy()
+ };
+
+ var producer = await Producer.Create(config);
- }
- );
for (var i = 0; i < 5; i++)
{
await producer.Send(new Message(Encoding.UTF8.GetBytes($"hello {i}")));
@@ -199,13 +211,15 @@ public async void ProducerHandleDeleteStreamWithMetaDataUpdate()
{
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
var clientProviderName = Guid.NewGuid().ToString();
- var producer = await Producer.Create(
+ var config =
new ProducerConfig(system, stream)
{
ClientProvidedName = clientProviderName,
ConfirmationHandler = _ => Task.CompletedTask
- }
- );
+ };
+ var statusInfoReceived = new List();
+ config.StatusChanged += (status) => { statusInfoReceived.Add(status); };
+ var producer = await Producer.Create(config);
Assert.True(producer.IsOpen());
// When the stream is deleted the producer has to close the
@@ -213,6 +227,18 @@ public async void ProducerHandleDeleteStreamWithMetaDataUpdate()
await system.DeleteStream(stream);
SystemUtils.WaitUntil(() => !producer.IsOpen());
+
+ Assert.Equal(ReliableEntityStatus.Initialization, statusInfoReceived[0].From);
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[0].To);
+
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[1].From);
+ Assert.Equal(ReliableEntityStatus.Reconnection, statusInfoReceived[1].To);
+ Assert.Equal(ChangeStatusReason.MetaDataUpdate, statusInfoReceived[1].Reason);
+
+ Assert.Equal(ReliableEntityStatus.Reconnection, statusInfoReceived[2].From);
+ Assert.Equal(ReliableEntityStatus.Closed, statusInfoReceived[2].To);
+ Assert.Equal(ChangeStatusReason.ClosedByUser, statusInfoReceived[2].Reason);
+
await system.Close();
}
@@ -233,7 +259,8 @@ public async void HandleChangeStreamConfigurationWithMetaDataUpdate()
);
Assert.True(producer.IsOpen());
- await producer.OnEntityClosed(system, stream);
+ await producer.OnEntityClosed(system, stream,
+ ChangeStatusReason.UnexpectedlyDisconnected);
SystemUtils.Wait();
Assert.True(producer.IsOpen());
await system.DeleteStream(stream);
@@ -343,7 +370,6 @@ public async void FirstConsumeAfterKillConnectionShouldContinueToWork()
await Task.CompletedTask;
},
ReconnectStrategy = new TestBackOffReconnectStrategy()
-
});
// in this case we kill the connection before consume consume any message
// so it should use the selected OffsetSpec in this case = new OffsetTypeFirst(),
@@ -370,10 +396,11 @@ await SystemUtils.PublishMessages(system, stream, NumberOfMessages,
Guid.NewGuid().ToString(),
_testOutputHelper);
var testPassed = new TaskCompletionSource();
+ var statusCompleted = new TaskCompletionSource();
var clientProviderName = Guid.NewGuid().ToString();
var reference = Guid.NewGuid().ToString();
var messagesReceived = 0;
- var consumer = await Consumer.Create(new ConsumerConfig(system, stream)
+ var config = new ConsumerConfig(system, stream)
{
Crc32 = _crc32,
Reference = reference,
@@ -392,7 +419,17 @@ await SystemUtils.PublishMessages(system, stream, NumberOfMessages,
await Task.CompletedTask;
},
ReconnectStrategy = new TestBackOffReconnectStrategy()
- });
+ };
+
+ var statusInfoReceived = new List();
+ config.StatusChanged += (status) =>
+ {
+ statusInfoReceived.Add(status);
+ if (statusInfoReceived.Count == 5)
+ statusCompleted.SetResult(true);
+ };
+
+ var consumer = await Consumer.Create(config);
// kill the first time
SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections(clientProviderName).Result == 1);
await SystemUtils.PublishMessages(system, stream, NumberOfMessages,
@@ -402,8 +439,38 @@ await SystemUtils.PublishMessages(system, stream, NumberOfMessages,
new Utils(_testOutputHelper).WaitUntilTaskCompletes(testPassed);
// after kill the consumer must be open
Assert.True(consumer.IsOpen());
+ // there is check to have the statusInfoReceived consistent and to
+ // to have the test passed.
+ // In a real situation the test isOpen is always correct but the internal status
+ // is not updated yet. Since the status Reconnection is considered as a valid Open() status
+ new Utils(_testOutputHelper).WaitUntilTaskCompletes(statusCompleted);
+
await consumer.Close();
Assert.False(consumer.IsOpen());
+ // We must have 6 status here
+ Assert.Equal(6, statusInfoReceived.Count);
+ Assert.Equal(ReliableEntityStatus.Initialization, statusInfoReceived[0].From);
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[0].To);
+
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[1].From);
+ Assert.Equal(ReliableEntityStatus.Reconnection, statusInfoReceived[1].To);
+ Assert.Equal(ChangeStatusReason.UnexpectedlyDisconnected, statusInfoReceived[1].Reason);
+
+ Assert.Equal(ReliableEntityStatus.Reconnection, statusInfoReceived[2].From);
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[2].To);
+ Assert.Equal(ChangeStatusReason.None, statusInfoReceived[2].Reason);
+
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[3].From);
+ Assert.Equal(ReliableEntityStatus.Reconnection, statusInfoReceived[3].To);
+ Assert.Equal(ChangeStatusReason.UnexpectedlyDisconnected, statusInfoReceived[3].Reason);
+
+ Assert.Equal(ReliableEntityStatus.Reconnection, statusInfoReceived[4].From);
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[4].To);
+ Assert.Equal(ChangeStatusReason.None, statusInfoReceived[4].Reason);
+
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[5].From);
+ Assert.Equal(ReliableEntityStatus.Closed, statusInfoReceived[5].To);
+ Assert.Equal(ChangeStatusReason.ClosedByUser, statusInfoReceived[5].Reason);
await system.DeleteStream(stream);
await system.Close();
@@ -414,19 +481,86 @@ public async void ConsumerHandleDeleteStreamWithMetaDataUpdate()
{
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
var clientProviderName = Guid.NewGuid().ToString();
- var consumer = await Consumer.Create(
+ var config =
new ConsumerConfig(system, stream)
{
ClientProvidedName = clientProviderName,
ReconnectStrategy = new TestBackOffReconnectStrategy()
- }
- );
+ };
+ var statusInfoReceived = new List();
+ config.StatusChanged += (status) => { statusInfoReceived.Add(status); };
+ var consumer = await Consumer.Create(config);
Assert.True(consumer.IsOpen());
// When the stream is deleted the consumer has to close the
// connection an become inactive.
await system.DeleteStream(stream);
SystemUtils.WaitUntil(() => !consumer.IsOpen());
+ Assert.Equal(ReliableEntityStatus.Initialization, statusInfoReceived[0].From);
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[0].To);
+ Assert.Equal(ChangeStatusReason.None, statusInfoReceived[0].Reason);
+
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[1].From);
+ Assert.Equal(ReliableEntityStatus.Reconnection, statusInfoReceived[1].To);
+ Assert.Equal(ChangeStatusReason.MetaDataUpdate, statusInfoReceived[1].Reason);
+
+ Assert.Equal(ReliableEntityStatus.Reconnection, statusInfoReceived[2].From);
+ Assert.Equal(ReliableEntityStatus.Closed, statusInfoReceived[2].To);
+ Assert.Equal(ChangeStatusReason.ClosedByUser, statusInfoReceived[2].Reason);
+ await system.Close();
+ }
+
+ [Fact]
+ public async void ConsumerShouldReceiveBoolFail()
+ {
+ var system = await StreamSystem.Create(new StreamSystemConfig());
+ var config = new ConsumerConfig(system, "DOES_NOT_EXIST")
+ {
+ ReconnectStrategy = new TestBackOffReconnectStrategy()
+ };
+
+ var statusCompleted = new TaskCompletionSource();
+ var statusInfoReceived = new List();
+ config.StatusChanged += status =>
+ {
+ statusInfoReceived.Add(status);
+ if (statusInfoReceived.Count == 1)
+ {
+ statusCompleted.SetResult(true);
+ }
+ };
+ await Assert.ThrowsAsync(async () => await Consumer.Create(config));
+ new Utils(_testOutputHelper).WaitUntilTaskCompletes(statusCompleted);
+ Assert.Equal(ChangeStatusReason.BoolFailure, statusInfoReceived[0].Reason);
+ Assert.Equal(ReliableEntityStatus.Initialization, statusInfoReceived[0].From);
+ Assert.Equal(ReliableEntityStatus.Closed, statusInfoReceived[0].To);
+ await system.Close();
+ }
+
+ [Fact]
+ public async void ProducerShouldReceiveBoolFail()
+ {
+ var system = await StreamSystem.Create(new StreamSystemConfig());
+ var config = new ProducerConfig(system, "DOES_NOT_EXIST")
+ {
+ ReconnectStrategy = new TestBackOffReconnectStrategy()
+ };
+
+ var statusCompleted = new TaskCompletionSource();
+ var statusInfoReceived = new List();
+ config.StatusChanged += status =>
+ {
+ statusInfoReceived.Add(status);
+ if (statusInfoReceived.Count == 1)
+ {
+ statusCompleted.SetResult(true);
+ }
+ };
+ await Assert.ThrowsAsync(async () => await Producer.Create(config));
+ new Utils(_testOutputHelper).WaitUntilTaskCompletes(statusCompleted);
+ Assert.Equal(ChangeStatusReason.BoolFailure, statusInfoReceived[0].Reason);
+ Assert.Equal(ReliableEntityStatus.Initialization, statusInfoReceived[0].From);
+ Assert.Equal(ReliableEntityStatus.Closed, statusInfoReceived[0].To);
await system.Close();
}
@@ -522,7 +656,7 @@ protected override Task CreateNewEntity(bool boot)
return Task.CompletedTask;
}
- UpdateStatus(ReliableEntityStatus.Open);
+ UpdateStatus(ReliableEntityStatus.Open, ChangeStatusReason.None);
// raise the exception only one time
// to avoid loops
_firstTime = false;
@@ -546,7 +680,7 @@ public async void RConsumerShouldStopWhenThrowUnknownException()
new Exception("Fake Exception"));
await Assert.ThrowsAsync(() =>
- c.Init(new BackOffReconnectStrategy(), new ResourceAvailableBackOffReconnectStrategy()));
+ c.Init(new ConsumerConfig(system, stream)));
Assert.False(c.IsOpen());
@@ -567,14 +701,15 @@ await Assert.ThrowsAsync(() =>
public async void ConsumerShouldFailFast(Exception exception)
{
SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream);
- var c = new FakeThrowExceptionConsumer(new ConsumerConfig(system, stream),
+ var config = new ConsumerConfig(system, stream) { ReconnectStrategy = new TestBackOffReconnectStrategy() };
+
+ var c = new FakeThrowExceptionConsumer(config,
exception);
Assert.True(ClientExceptions.IsAKnownException(exception));
try
{
- await c.Init(new BackOffReconnectStrategy(),
- new ResourceAvailableBackOffReconnectStrategy());
+ await c.Init(new ConsumerConfig(system, stream));
}
catch (Exception e)
{
diff --git a/Tests/SuperStreamConsumerTests.cs b/Tests/SuperStreamConsumerTests.cs
index 3ce452fc..ff22b33f 100644
--- a/Tests/SuperStreamConsumerTests.cs
+++ b/Tests/SuperStreamConsumerTests.cs
@@ -39,7 +39,8 @@ public async void NumberOfConnectionsShouldBeEqualsToThePartitions()
ClientProvidedName = clientProvidedName,
Identifier = "super_stream_consumer_88888",
OffsetSpec =
- await SystemUtils.OffsetsForSuperStreamConsumer(system, SystemUtils.InvoicesExchange, new OffsetTypeFirst())
+ await SystemUtils.OffsetsForSuperStreamConsumer(system, SystemUtils.InvoicesExchange,
+ new OffsetTypeFirst())
});
Assert.NotNull(consumer);
@@ -358,7 +359,7 @@ public async void ReliableConsumerNumberOfMessagesConsumedShouldBeEqualsToPublis
var listConsumed = new ConcurrentBag();
var testPassed = new TaskCompletionSource();
- var consumer = await Consumer.Create(new ConsumerConfig(system, SystemUtils.InvoicesExchange)
+ var config = new ConsumerConfig(system, SystemUtils.InvoicesExchange)
{
OffsetSpec = new OffsetTypeFirst(),
IsSuperStream = true,
@@ -372,7 +373,12 @@ public async void ReliableConsumerNumberOfMessagesConsumedShouldBeEqualsToPublis
return Task.CompletedTask;
}
- });
+ };
+
+ var statusInfoReceived = new List();
+ config.StatusChanged += status => { statusInfoReceived.Add(status); };
+
+ var consumer = await Consumer.Create(config);
new Utils(_testOutputHelper).WaitUntilTaskCompletes(testPassed);
Assert.True(testPassed.Task.Result);
@@ -383,6 +389,14 @@ public async void ReliableConsumerNumberOfMessagesConsumedShouldBeEqualsToPublis
Assert.Equal(4,
listConsumed.Sum(x => x == SystemUtils.InvoicesStream2 ? 1 : 0));
await consumer.Close();
+ SystemUtils.Wait();
+ Assert.Equal(2, statusInfoReceived.Count);
+ Assert.Equal(ReliableEntityStatus.Initialization, statusInfoReceived[0].From);
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[0].To);
+
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[1].From);
+ Assert.Equal(ReliableEntityStatus.Closed, statusInfoReceived[1].To);
+
await system.Close();
}
@@ -479,7 +493,6 @@ Func> consumerUpdateListener
SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_1").Result == 0);
SystemUtils.WaitUntil(() => SystemUtils.ConnectionsCountByName($"{clientProvidedName}_2").Result == 0);
await consumerSingle.Close();
-
await system.Close();
}
@@ -554,4 +567,81 @@ public async void SaCAddNewConsumerShouldReceiveAllTheMessage()
await secondConsumer.Close();
await system.Close();
}
+
+ [Fact]
+ public async void SuperConsumerShouldReceive4StatusInfo()
+ {
+ SystemUtils.ResetSuperStreams();
+ var system = await StreamSystem.Create(new StreamSystemConfig());
+
+ var clientProvidedName = Guid.NewGuid().ToString();
+ var config = new ConsumerConfig(system, SystemUtils.InvoicesExchange)
+ {
+ ClientProvidedName = clientProvidedName,
+ OffsetSpec = new OffsetTypeFirst(),
+ IsSuperStream = true,
+ ReconnectStrategy = new TestBackOffReconnectStrategy()
+ };
+
+ var statusCompleted = new TaskCompletionSource();
+ var statusInfoReceived = new List();
+ config.StatusChanged += status =>
+ {
+ statusInfoReceived.Add(status);
+ if (statusInfoReceived.Count == 3)
+ {
+ statusCompleted.SetResult(true);
+ }
+ };
+
+ var consumer = await Consumer.Create(config);
+ SystemUtils.WaitUntil(() => SystemUtils.HttpKillConnections($"{clientProvidedName}_0").Result == 1);
+ new Utils(_testOutputHelper).WaitUntilTaskCompletes(statusCompleted);
+
+ Assert.Equal(3, statusInfoReceived.Count);
+ Assert.Equal(ReliableEntityStatus.Initialization, statusInfoReceived[0].From);
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[0].To);
+
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[1].From);
+ Assert.Equal(ReliableEntityStatus.Reconnection, statusInfoReceived[1].To);
+ Assert.Equal(ChangeStatusReason.UnexpectedlyDisconnected, statusInfoReceived[1].Reason);
+
+ Assert.Equal(ReliableEntityStatus.Reconnection, statusInfoReceived[2].From);
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[2].To);
+ Assert.Equal(ChangeStatusReason.None, statusInfoReceived[2].Reason);
+ await consumer.Close();
+ SystemUtils.Wait();
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[3].From);
+ Assert.Equal(ReliableEntityStatus.Closed, statusInfoReceived[3].To);
+ Assert.Equal(ChangeStatusReason.ClosedByUser, statusInfoReceived[3].Reason);
+ await system.Close();
+ }
+
+ [Fact]
+ public async void SuperConsumerShouldReceiveBoolFail()
+ {
+ var system = await StreamSystem.Create(new StreamSystemConfig());
+ var config = new ConsumerConfig(system, "DOES_NOT_EXIST")
+ {
+ IsSuperStream = true,
+ ReconnectStrategy = new TestBackOffReconnectStrategy()
+ };
+
+ var statusCompleted = new TaskCompletionSource();
+ var statusInfoReceived = new List();
+ config.StatusChanged += status =>
+ {
+ statusInfoReceived.Add(status);
+ if (statusInfoReceived.Count == 1)
+ {
+ statusCompleted.SetResult(true);
+ }
+ };
+ await Assert.ThrowsAsync(async () => await Consumer.Create(config));
+ new Utils(_testOutputHelper).WaitUntilTaskCompletes(statusCompleted);
+ Assert.Equal(ChangeStatusReason.BoolFailure, statusInfoReceived[0].Reason);
+ Assert.Equal(ReliableEntityStatus.Initialization, statusInfoReceived[0].From);
+ Assert.Equal(ReliableEntityStatus.Closed, statusInfoReceived[0].To);
+ await system.Close();
+ }
}
diff --git a/Tests/SuperStreamProducerTests.cs b/Tests/SuperStreamProducerTests.cs
index 2637390d..c39ee05b 100644
--- a/Tests/SuperStreamProducerTests.cs
+++ b/Tests/SuperStreamProducerTests.cs
@@ -728,8 +728,9 @@ public async void ReliableProducerHandleConfirmation()
var testPassed = new TaskCompletionSource();
var confirmedList = new ConcurrentBag<(string, Message)>();
var system = await StreamSystem.Create(new StreamSystemConfig());
- var streamProducer = await Producer.Create(new ProducerConfig(system, SystemUtils.InvoicesExchange)
+ var config = new ProducerConfig(system, SystemUtils.InvoicesExchange)
{
+ Identifier = "my_super_producer_908",
SuperStreamConfig =
new SuperStreamConfig() { Routing = message1 => message1.Properties.MessageId.ToString() },
ConfirmationHandler = confirmation =>
@@ -746,7 +747,11 @@ public async void ReliableProducerHandleConfirmation()
return Task.CompletedTask;
}
- });
+ };
+ var statusInfoReceived = new List();
+ config.StatusChanged += status => { statusInfoReceived.Add(status); };
+
+ var streamProducer = await Producer.Create(config);
for (ulong i = 0; i < 20; i++)
{
@@ -762,6 +767,17 @@ public async void ReliableProducerHandleConfirmation()
Assert.Equal(9, confirmedList.Count(x => x.Item1 == SystemUtils.InvoicesStream0));
Assert.Equal(7, confirmedList.Count(x => x.Item1 == SystemUtils.InvoicesStream1));
Assert.Equal(4, confirmedList.Count(x => x.Item1 == SystemUtils.InvoicesStream2));
+ await streamProducer.Close();
+ Assert.Equal(ReliableEntityStatus.Initialization, statusInfoReceived[0].From);
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[0].To);
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[1].From);
+ Assert.Equal(ReliableEntityStatus.Closed, statusInfoReceived[1].To);
+
+ Assert.Equal("my_super_producer_908", statusInfoReceived[0].Identifier);
+ Assert.Equal("my_super_producer_908", statusInfoReceived[1].Identifier);
+
+ Assert.Equal(SystemUtils.InvoicesExchange, statusInfoReceived[0].Stream);
+ Assert.Equal(SystemUtils.InvoicesExchange, statusInfoReceived[1].Stream);
await system.Close();
}
@@ -776,9 +792,10 @@ public async void ReliableProducerSendMessageConnectionsIfKilled()
var system = await StreamSystem.Create(new StreamSystemConfig());
var clientName = Guid.NewGuid().ToString();
var testPassed = new TaskCompletionSource() { };
+ var statusCompleted = new TaskCompletionSource() { };
var received = 0;
var error = 0;
- var streamProducer = await Producer.Create(new ProducerConfig(system, SystemUtils.InvoicesExchange)
+ var config = new ProducerConfig(system, SystemUtils.InvoicesExchange)
{
SuperStreamConfig =
new SuperStreamConfig() { Routing = message1 => message1.Properties.MessageId.ToString() },
@@ -799,7 +816,19 @@ public async void ReliableProducerSendMessageConnectionsIfKilled()
},
ClientProvidedName = clientName,
ReconnectStrategy = new TestBackOffReconnectStrategy()
- });
+ };
+ var statusInfoReceived = new List();
+ config.StatusChanged += status =>
+ {
+ statusInfoReceived.Add(status);
+ if (statusInfoReceived.Count == 3)
+ {
+ statusCompleted.SetResult(true);
+ }
+ };
+
+ var streamProducer = await Producer.Create(config);
+
for (ulong i = 0; i < 20; i++)
{
var message = new Message(Encoding.Default.GetBytes("hello"))
@@ -824,7 +853,28 @@ public async void ReliableProducerSendMessageConnectionsIfKilled()
SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream1) == 7);
SystemUtils.WaitUntil(() => SystemUtils.HttpGetQMsgCount(SystemUtils.InvoicesStream2) == 4);
+ new Utils(_testOutputHelper).WaitUntilTaskCompletes(statusCompleted);
+
+ Assert.Equal(ReliableEntityStatus.Initialization, statusInfoReceived[0].From);
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[0].To);
+
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[1].From);
+ Assert.Equal(ReliableEntityStatus.Reconnection, statusInfoReceived[1].To);
+ Assert.Equal(ChangeStatusReason.UnexpectedlyDisconnected, statusInfoReceived[1].Reason);
+
+ Assert.Equal(SystemUtils.InvoicesExchange, statusInfoReceived[1].Stream);
+ Assert.Equal(SystemUtils.InvoicesStream0, statusInfoReceived[1].Partition);
+
+ Assert.Equal(ReliableEntityStatus.Reconnection, statusInfoReceived[2].From);
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[2].To);
+ Assert.Equal(ChangeStatusReason.None, statusInfoReceived[2].Reason);
+
await streamProducer.Close();
+
+ Assert.Equal(ReliableEntityStatus.Open, statusInfoReceived[3].From);
+ Assert.Equal(ReliableEntityStatus.Closed, statusInfoReceived[3].To);
+ Assert.Equal(ChangeStatusReason.ClosedByUser, statusInfoReceived[3].Reason);
+
await system.Close();
}
}
diff --git a/docs/ReliableClient/Program.cs b/docs/ReliableClient/Program.cs
new file mode 100644
index 00000000..2f64c89d
--- /dev/null
+++ b/docs/ReliableClient/Program.cs
@@ -0,0 +1,24 @@
+// See https://aka.ms/new-console-template for more information
+
+using ReliableClient;
+
+Console.WriteLine("Starting RabbitMQ Streaming Client");
+
+
+var rClient = RClient.Start(new RClient.Config()
+{
+ ProducersPerConnection = 2,
+ ConsumersPerConnection = 2,
+ Host = "Node0",
+ Port = 5553,
+ LoadBalancer = true,
+ SuperStream = true,
+ Streams = 1,
+ Producers = 1,
+ MessagesPerProducer = 50_000_000,
+ Consumers = 4
+ // Username = "test",
+ // Password = "test"
+});
+
+await rClient.ConfigureAwait(false);
diff --git a/docs/ReliableClient/RClient.cs b/docs/ReliableClient/RClient.cs
new file mode 100644
index 00000000..9a31e611
--- /dev/null
+++ b/docs/ReliableClient/RClient.cs
@@ -0,0 +1,281 @@
+// This source code is dual-licensed under the Apache License, version
+// 2.0, and the Mozilla Public License, version 2.0.
+// Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+
+using System.Collections.Concurrent;
+using System.Net;
+using System.Text;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Console;
+using RabbitMQ.Stream.Client;
+using RabbitMQ.Stream.Client.AMQP;
+using RabbitMQ.Stream.Client.Reliable;
+
+namespace ReliableClient;
+
+public class RClient
+{
+ public record Config
+ {
+ public string Host { get; set; } = "localhost";
+ public int Port { get; set; } = 5552;
+ public string Username { get; set; } = "guest";
+ public string Password { get; set; } = "guest";
+ public bool LoadBalancer { get; set; } = false;
+ public bool SuperStream { get; set; } = false;
+ public int Streams { get; set; } = 1;
+ public int Producers { get; set; } = 9;
+ public byte ProducersPerConnection { get; set; } = 7;
+ public int MessagesPerProducer { get; set; } = 5_000_000;
+ public int Consumers { get; set; } = 9;
+ public byte ConsumersPerConnection { get; set; } = 8;
+ }
+
+ public static async Task Start(Config config)
+ {
+ var serviceCollection = new ServiceCollection();
+ serviceCollection.AddLogging(builder => builder
+ .AddSimpleConsole(options =>
+ {
+ options.IncludeScopes = true;
+ options.SingleLine = true;
+ options.TimestampFormat = "[HH:mm:ss] ";
+ options.ColorBehavior = LoggerColorBehavior.Default;
+ })
+ .AddFilter(level => level >= LogLevel.Information)
+ );
+ var loggerFactory = serviceCollection.BuildServiceProvider()
+ .GetService();
+
+
+ if (loggerFactory != null)
+ {
+ var lp = loggerFactory.CreateLogger();
+ var lc = loggerFactory.CreateLogger();
+
+ var ep = new IPEndPoint(IPAddress.Loopback, config.Port);
+
+ if (config.Host != "localhost")
+ {
+ switch (Uri.CheckHostName(config.Host))
+ {
+ case UriHostNameType.IPv4:
+ ep = new IPEndPoint(IPAddress.Parse(config.Host), config.Port);
+ break;
+ case UriHostNameType.Dns:
+ var addresses = await Dns.GetHostAddressesAsync(config.Host).ConfigureAwait(false);
+ ep = new IPEndPoint(addresses[0], config.Port);
+ break;
+ }
+ }
+
+ var streamConf = new StreamSystemConfig()
+ {
+ UserName = config.Username,
+ Password = config.Password,
+ Endpoints = new List() {ep},
+ ConnectionPoolConfig = new ConnectionPoolConfig()
+ {
+ ProducersPerConnection = config.ProducersPerConnection,
+ ConsumersPerConnection = config.ConsumersPerConnection,
+ }
+ };
+
+
+ if (config.LoadBalancer)
+ {
+ var resolver = new AddressResolver(ep);
+ streamConf = new StreamSystemConfig()
+ {
+ AddressResolver = resolver,
+ UserName = config.Username,
+ Password = config.Password,
+ Endpoints = new List() {resolver.EndPoint}
+ };
+ }
+
+
+ var system = await StreamSystem.Create(streamConf).ConfigureAwait(false);
+ var streamsList = new List();
+ if (config.SuperStream)
+ {
+ streamsList.Add("invoices");
+ }
+ else
+ {
+ for (var i = 0; i < config.Streams; i++)
+ {
+ streamsList.Add($"invoices-{i}");
+ }
+ }
+
+
+ var totalConfirmed = 0;
+ var totalError = 0;
+ var totalConsumed = 0;
+ var totalSent = 0;
+ var isRunning = true;
+
+ _ = Task.Run(() =>
+ {
+ while (isRunning)
+ {
+ Console.WriteLine(
+ $"When: {DateTime.Now}, " +
+ $"Tr {System.Diagnostics.Process.GetCurrentProcess().Threads.Count}, " +
+ $"Sent: {totalSent:#,##0.00}, " +
+ $"Conf: {totalConfirmed:#,##0.00}, " +
+ $"Error: {totalError:#,##0.00}, " +
+ $"Total: {(totalConfirmed + totalError):#,##0.00}, " +
+ $"Consumed: {totalConsumed:#,##0.00}, " +
+ $"Sent per stream: {totalSent / streamsList.Count}");
+ Thread.Sleep(5000);
+ }
+ });
+ List consumersList = new();
+ List producersList = new();
+ var obj = new object();
+ foreach (var stream in streamsList)
+ {
+ if (!config.SuperStream)
+ {
+ if (await system.StreamExists(stream).ConfigureAwait(false))
+ {
+ await system.DeleteStream(stream).ConfigureAwait(false);
+ }
+
+ await system.CreateStream(new StreamSpec(stream) {MaxLengthBytes = 30_000_000_000,})
+ .ConfigureAwait(false);
+ await Task.Delay(TimeSpan.FromSeconds(3)).ConfigureAwait(false);
+ }
+
+ for (var z = 0; z < config.Consumers; z++)
+ {
+ var conf = new ConsumerConfig(system, stream)
+ {
+ OffsetSpec = new OffsetTypeFirst(),
+ IsSuperStream = config.SuperStream,
+ IsSingleActiveConsumer = config.SuperStream,
+ Reference = "myApp",
+ Identifier = $"my_c_{z}",
+ InitialCredits = 10,
+ MessageHandler = (source, ctx, _, _) =>
+ {
+ Interlocked.Increment(ref totalConsumed);
+ return Task.CompletedTask;
+ },
+ };
+ conf.StatusChanged += (status) =>
+ {
+ var streamInfo = status.Partition is not null
+ ? $" Partition {status.Partition} of super stream: {status.Stream}"
+ : $"Stream: {status.Stream}";
+
+ lc.LogInformation("Consumer: {Id} - status changed from: {From} to: {To} reason: {Reason} {Info}",
+ status.Identifier, status.From, status.To,status.Reason, streamInfo);
+ };
+ consumersList.Add(
+ await Consumer.Create(conf, lc).ConfigureAwait(false));
+ }
+
+ async Task MaybeSend(Producer producer, Message message, ManualResetEvent publishEvent)
+ {
+ publishEvent.WaitOne();
+ await producer.Send(message).ConfigureAwait(false);
+ }
+
+ for (var z = 0; z < config.Producers; z++)
+ {
+ var z1 = z;
+ _ = Task.Run(async () =>
+ {
+ var unconfirmedMessages = new ConcurrentBag();
+ var publishEvent = new ManualResetEvent(false);
+
+ var producerConfig = new ProducerConfig(system, stream)
+ {
+ Identifier = $"my_super_{z1}",
+ SuperStreamConfig = new SuperStreamConfig()
+ {
+ Enabled = config.SuperStream, Routing = msg => msg.Properties.MessageId.ToString(),
+ },
+ ConfirmationHandler = confirmation =>
+ {
+ if (confirmation.Status != ConfirmationStatus.Confirmed)
+ {
+ confirmation.Messages.ForEach(m => { unconfirmedMessages.Add(m); });
+
+ Interlocked.Add(ref totalError, confirmation.Messages.Count);
+ return Task.CompletedTask;
+ }
+
+ Interlocked.Add(ref totalConfirmed, confirmation.Messages.Count);
+ return Task.CompletedTask;
+ },
+ };
+ producerConfig.StatusChanged += (status) =>
+ {
+ var streamInfo = status.Partition is not null
+ ? $" Partition {status.Partition} of super stream: {status.Stream}"
+ : $"Stream: {status.Stream}";
+
+ lp.LogInformation("Consumer: {Id} - status changed from: {From} to: {To} reason: {Reason} {Info}",
+ status.Identifier, status.From, status.To,status.Reason, streamInfo);
+
+ if (status.To == ReliableEntityStatus.Open)
+ {
+ publishEvent.Set();
+ }
+ else
+ {
+ publishEvent.Reset();
+ }
+ };
+ var producer = await Producer.Create(producerConfig, lp).ConfigureAwait(false);
+ lock (obj)
+ {
+ producersList.Add(producer);
+ }
+
+ for (var i = 0; i < config.MessagesPerProducer; i++)
+ {
+ if (!unconfirmedMessages.IsEmpty)
+ {
+ var msgs = unconfirmedMessages.ToArray();
+ unconfirmedMessages.Clear();
+ foreach (var msg in msgs)
+ {
+ await MaybeSend(producer, msg, publishEvent).ConfigureAwait(false);
+ Interlocked.Increment(ref totalSent);
+ }
+ }
+
+ var message = new Message(Encoding.Default.GetBytes("hello"))
+ {
+ Properties = new Properties() {MessageId = $"hello{i}"}
+ };
+ await MaybeSend(producer, message, publishEvent).ConfigureAwait(false);
+ await Task.Delay(500).ConfigureAwait(false);
+ Interlocked.Increment(ref totalSent);
+ }
+ });
+ }
+ }
+
+
+ Console.WriteLine("Press any key to close all the consumers");
+ Console.ReadKey();
+ isRunning = false;
+ Console.WriteLine("closing the producers ..... ");
+ producersList.ForEach(async p => await p.Close().ConfigureAwait(false));
+ Console.WriteLine("closing the consumers ..... ");
+ consumersList.ForEach(async c => await c.Close().ConfigureAwait(false));
+
+ Console.WriteLine("Press any key to close all");
+ Console.ReadKey();
+ }
+
+ Console.WriteLine("Closed all the consumers and producers");
+ }
+}
diff --git a/docs/ReliableClient/ReliableClient.csproj b/docs/ReliableClient/ReliableClient.csproj
new file mode 100644
index 00000000..413536df
--- /dev/null
+++ b/docs/ReliableClient/ReliableClient.csproj
@@ -0,0 +1,18 @@
+
+
+
+ Exe
+ net7.0
+ enable
+ enable
+
+
+
+
+
+
+
+
+
+
+
diff --git a/kubernetes/stream_cluster.yaml b/kubernetes/stream_cluster.yaml
index 74acc938..184e5e11 100644
--- a/kubernetes/stream_cluster.yaml
+++ b/kubernetes/stream_cluster.yaml
@@ -7,22 +7,25 @@ apiVersion: rabbitmq.com/v1beta1
kind: RabbitmqCluster
metadata:
name: tls
- namespace: stream-clients-test-1
+ namespace: stream-clients-test
spec:
replicas: 3
image: rabbitmq:3.13-rc-management
service:
type: LoadBalancer
- tls:
- secretName: tls-secret
+ # tls:
+ # secretName: tls-secret
resources:
requests:
- cpu: 500m
+ cpu: 1
memory: 1Gi
limits:
- cpu: 800m
+ cpu: 1
memory: 1Gi
rabbitmq:
additionalPlugins:
- rabbitmq_stream
- - rabbitmq_stream_management
\ No newline at end of file
+ - rabbitmq_stream_management
+ additionalConfig: |
+ log.console = true
+ log.console.level = debug
\ No newline at end of file