Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,13 @@ RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.ConnectionClosedHandler.get
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.ConnectionClosedHandler.set -> void
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.BoolFailure = 5 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.ClosedByStrategyPolicy = 4 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.ClosedByUser = 3 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.MetaDataUpdate = 2 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.None = 0 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.UnexpectedlyDisconnected = 1 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
RabbitMQ.Stream.Client.Reliable.Consumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.set -> void
Expand All @@ -209,7 +216,7 @@ RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
RabbitMQ.Stream.Client.Reliable.ProducerFactory.CreateProducer(bool boot) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
RabbitMQ.Stream.Client.Reliable.ProducerFactory._producer -> RabbitMQ.Stream.Client.IProducer
RabbitMQ.Stream.Client.Reliable.ReliableBase.UpdateStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus newStatus, string partition = null) -> void
RabbitMQ.Stream.Client.Reliable.ReliableBase.UpdateStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus newStatus, RabbitMQ.Stream.Client.Reliable.ChangeStatusReason reason, string partition = null) -> void
RabbitMQ.Stream.Client.Reliable.ReliableBase._status -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Identifier.get -> string
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Identifier.set -> void
Expand All @@ -220,19 +227,20 @@ RabbitMQ.Stream.Client.Reliable.ReliableConfig.ResourceAvailableReconnectStrateg
RabbitMQ.Stream.Client.Reliable.ReliableConfig.StatusChanged -> RabbitMQ.Stream.Client.Reliable.ReliableConfig.StatusChangedHandler
RabbitMQ.Stream.Client.Reliable.ReliableConfig.StatusChangedHandler
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Closed = 4 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Closed = 3 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Initialization = 0 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Open = 1 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.ReconnectionForMetaDataUpdate = 3 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.ReconnectionForUnexpectedlyDisconnected = 2 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Reconnection = 2 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
RabbitMQ.Stream.Client.Reliable.StatusInfo
RabbitMQ.Stream.Client.Reliable.StatusInfo.From.get -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
RabbitMQ.Stream.Client.Reliable.StatusInfo.From.init -> void
RabbitMQ.Stream.Client.Reliable.StatusInfo.Identifier.get -> string
RabbitMQ.Stream.Client.Reliable.StatusInfo.Identifier.init -> void
RabbitMQ.Stream.Client.Reliable.StatusInfo.Partition.get -> string
RabbitMQ.Stream.Client.Reliable.StatusInfo.Partition.init -> void
RabbitMQ.Stream.Client.Reliable.StatusInfo.StatusInfo(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus From, RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus To, string Stream, string Identifier, string Partition) -> void
RabbitMQ.Stream.Client.Reliable.StatusInfo.Reason.get -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
RabbitMQ.Stream.Client.Reliable.StatusInfo.Reason.init -> void
RabbitMQ.Stream.Client.Reliable.StatusInfo.StatusInfo(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus From, RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus To, string Stream, string Identifier, string Partition, RabbitMQ.Stream.Client.Reliable.ChangeStatusReason Reason = RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.None) -> void
RabbitMQ.Stream.Client.Reliable.StatusInfo.Stream.get -> string
RabbitMQ.Stream.Client.Reliable.StatusInfo.Stream.init -> void
RabbitMQ.Stream.Client.Reliable.StatusInfo.To.get -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
Expand Down
4 changes: 2 additions & 2 deletions RabbitMQ.Stream.Client/Reliable/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,11 @@ public override async Task Close()
{
if (_status == ReliableEntityStatus.Initialization)
{
UpdateStatus(ReliableEntityStatus.Closed);
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser);
return;
}

UpdateStatus(ReliableEntityStatus.Closed);
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser);
await CloseEntity().ConfigureAwait(false);
_logger?.LogDebug("Consumer {Identity} closed", ToString());
}
Expand Down
8 changes: 4 additions & 4 deletions RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ private async Task<IConsumer> StandardConsumer(bool boot)
}

await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
ReliableEntityStatus.ReconnectionForUnexpectedlyDisconnected).ConfigureAwait(false);
ChangeStatusReason.UnexpectedlyDisconnected).ConfigureAwait(false);
},
MetadataHandler = async _ =>
{
await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
ReliableEntityStatus.ReconnectionForMetaDataUpdate).ConfigureAwait(false);
ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false);
},
MessageHandler = async (consumer, ctx, message) =>
{
Expand Down Expand Up @@ -137,15 +137,15 @@ private async Task<IConsumer> SuperConsumer(bool boot)

var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r,
ReliableEntityStatus.ReconnectionForUnexpectedlyDisconnected)
ChangeStatusReason.UnexpectedlyDisconnected)
.ConfigureAwait(false);
},
MetadataHandler = async update =>
{
await RandomWait().ConfigureAwait(false);
var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r,
ReliableEntityStatus.ReconnectionForMetaDataUpdate)
ChangeStatusReason.MetaDataUpdate)
.ConfigureAwait(false);
},
MessageHandler = async (partitionStream, consumer, ctx, message) =>
Expand Down
4 changes: 2 additions & 2 deletions RabbitMQ.Stream.Client/Reliable/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,11 @@ public override async Task Close()
{
if (ReliableEntityStatus.Initialization == _status)
{
UpdateStatus(ReliableEntityStatus.Closed);
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser);
return;
}

UpdateStatus(ReliableEntityStatus.Closed);
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser);
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
try
{
Expand Down
8 changes: 4 additions & 4 deletions RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ private async Task<IProducer> SuperStreamProducer(bool boot)

var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r,
ReliableEntityStatus.ReconnectionForUnexpectedlyDisconnected)
ChangeStatusReason.UnexpectedlyDisconnected)
.ConfigureAwait(false);
},
MetadataHandler = async update =>
{
await RandomWait().ConfigureAwait(false);
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
await OnEntityClosed(_producerConfig.StreamSystem, update.Stream, r,
ReliableEntityStatus.ReconnectionForMetaDataUpdate)
ChangeStatusReason.MetaDataUpdate)
.ConfigureAwait(false);
},
ConfirmHandler = confirmationHandler =>
Expand Down Expand Up @@ -106,7 +106,7 @@ private async Task<IProducer> StandardProducer()
{
await RandomWait().ConfigureAwait(false);
await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream,
ReliableEntityStatus.ReconnectionForMetaDataUpdate).ConfigureAwait(false);
ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false);
},
ConnectionClosedHandler = async (closeReason) =>
{
Expand All @@ -118,7 +118,7 @@ await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream,
}

await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream,
ReliableEntityStatus.ReconnectionForUnexpectedlyDisconnected).ConfigureAwait(false);
ChangeStatusReason.UnexpectedlyDisconnected).ConfigureAwait(false);
},
ConfirmHandler = confirmation =>
{
Expand Down
50 changes: 31 additions & 19 deletions RabbitMQ.Stream.Client/Reliable/ReliableBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,26 @@ namespace RabbitMQ.Stream.Client.Reliable;
/// <param name="Stream"> Stream or SuperSuper affected</param>
/// <param name="Identifier"> The Entity Identifier </param>
/// <param name="Partition"> Super stream partition. Valid only for SuperStream else is empty</param>
/// <param name="Reason"> The reason why the status changed </param>
public record StatusInfo(
ReliableEntityStatus From,
ReliableEntityStatus To,
ReliableEntityStatus From, // init
ReliableEntityStatus To, // open
string Stream,
string Identifier,
string Partition
string Partition,
ChangeStatusReason Reason = ChangeStatusReason.None
);

public enum ChangeStatusReason
{
None,
UnexpectedlyDisconnected,
MetaDataUpdate,
ClosedByUser,
ClosedByStrategyPolicy,
BoolFailure
}

public record ReliableConfig
{
/// <summary>
Expand Down Expand Up @@ -81,8 +93,7 @@ public enum ReliableEntityStatus
{
Initialization, // the entity is initializing
Open, // the entity is open and ready to use
ReconnectionForUnexpectedlyDisconnected, // the entity is disconnected in an unexpected way but still considered open
ReconnectionForMetaDataUpdate, // the entity is disconnected because the stream topology has changed but still considered open
Reconnection, // the entity is in reconnection but it is still considered open
Closed, // the entity is closed and cannot be used anymore
}

Expand All @@ -103,7 +114,8 @@ protected static async Task RandomWait()
await Task.Delay(Consts.RandomMid()).ConfigureAwait(false);
}

protected void UpdateStatus(ReliableEntityStatus newStatus, string partition = null)
protected void UpdateStatus(ReliableEntityStatus newStatus,
ChangeStatusReason reason, string partition = null)
{
var oldStatus = _status;
lock (_lock)
Expand All @@ -113,7 +125,7 @@ protected void UpdateStatus(ReliableEntityStatus newStatus, string partition = n
{
_reliableConfig.OnStatusChanged(new StatusInfo(oldStatus, newStatus,
_reliableConfig.Stream,
_reliableConfig.Identifier, partition));
_reliableConfig.Identifier, partition, reason));
}
}
}
Expand Down Expand Up @@ -150,7 +162,7 @@ private async Task MaybeInit(bool boot)
// else there are two ways:
// - the exception is a known exception and the client will try to reconnect
// - the exception is not a known exception and the client will throw the exception
UpdateStatus(ReliableEntityStatus.Open);
UpdateStatus(ReliableEntityStatus.Open, ChangeStatusReason.None);
}
catch (Exception e)
{
Expand All @@ -159,7 +171,7 @@ private async Task MaybeInit(bool boot)
BaseLogger.LogError("{Identity} Error during the first boot {EMessage}",
ToString(), e.Message);
// if it is the first boot we don't need to reconnect
UpdateStatus(ReliableEntityStatus.Closed);
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.BoolFailure);
throw;
}

Expand Down Expand Up @@ -188,7 +200,7 @@ private async Task Init(bool boot)
// each time that the client is initialized, we need to reset the status
// if we hare here it means that the entity is not open for some reason like:
// first time initialization or reconnect due of a IsAKnownException
UpdateStatus(ReliableEntityStatus.Initialization);
UpdateStatus(ReliableEntityStatus.Initialization, ChangeStatusReason.None);

await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
try
Expand Down Expand Up @@ -275,7 +287,7 @@ private async Task MaybeReconnect()
if (!reconnect)
{
BaseLogger.LogDebug("{Identity} is closed due of reconnect strategy", ToString());
UpdateStatus(ReliableEntityStatus.Closed);
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByStrategyPolicy);
return;
}

Expand All @@ -285,8 +297,7 @@ private async Task MaybeReconnect()
await MaybeInit(false).ConfigureAwait(false);
break;
case false:
if (CompareStatus(ReliableEntityStatus.ReconnectionForMetaDataUpdate) ||
CompareStatus(ReliableEntityStatus.ReconnectionForUnexpectedlyDisconnected))
if (CompareStatus(ReliableEntityStatus.Reconnection))
{
BaseLogger.LogDebug("{Identity} is in Reconnecting", ToString());
}
Expand All @@ -304,14 +315,14 @@ private async Task MaybeReconnectPartition(StreamInfo streamInfo, string info,
if (!reconnect)
{
BaseLogger.LogDebug("{Identity} partition is closed due of reconnect strategy", ToString());
UpdateStatus(ReliableEntityStatus.Closed, streamInfo.Stream);
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByStrategyPolicy, streamInfo.Stream);
return;
}

try
{
await reconnectPartitionFunc(streamInfo).ConfigureAwait(false);
UpdateStatus(ReliableEntityStatus.Open, streamInfo.Stream);
UpdateStatus(ReliableEntityStatus.Open, ChangeStatusReason.None, streamInfo.Stream);
await _reliableConfig.ReconnectStrategy.WhenConnected(
$"Super Stream partition: {streamInfo.Stream} for {info}").ConfigureAwait(false);
}
Expand Down Expand Up @@ -353,11 +364,12 @@ private void LogException(Exception exception)
/// <param name="reconnectPartitionFunc">Function to reconnect the partition</param>
/// <param name="reason">The reason why the connection is closed (Metadata update od disconnection)</param>
internal async Task OnEntityClosed(StreamSystem system, string stream,
Func<StreamInfo, Task> reconnectPartitionFunc, ReliableEntityStatus reason)
Func<StreamInfo, Task> reconnectPartitionFunc, ChangeStatusReason reason)
{
var streamExists = false;
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
UpdateStatus(reason, stream);
UpdateStatus(ReliableEntityStatus.Reconnection, reason,
stream);
try
{
streamExists = await CheckIfStreamIsAvailable(stream, system)
Expand Down Expand Up @@ -385,11 +397,11 @@ internal async Task OnEntityClosed(StreamSystem system, string stream,
/// <param name="system">Stream system</param>
/// <param name="stream">Stream</param>
/// <param name="reason">The reason why the connection is closed (Metadata update od disconnection)</param>
internal async Task OnEntityClosed(StreamSystem system, string stream, ReliableEntityStatus reason)
internal async Task OnEntityClosed(StreamSystem system, string stream, ChangeStatusReason reason)
{
var streamExists = false;
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
UpdateStatus(reason, stream);
UpdateStatus(ReliableEntityStatus.Reconnection, reason, stream);
try
{
streamExists = await CheckIfStreamIsAvailable(stream, system)
Expand Down
Loading