Skip to content

Commit 2767f37

Browse files
authored
add reason (#350)
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent d31a55d commit 2767f37

File tree

12 files changed

+186
-62
lines changed

12 files changed

+186
-62
lines changed

RabbitMQ.Stream.Client/PublicAPI.Unshipped.txt

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,13 @@ RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.ConnectionClosedHandler.get
184184
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.ConnectionClosedHandler.set -> void
185185
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.get -> RabbitMQ.Stream.Client.RoutingStrategyType
186186
RabbitMQ.Stream.Client.RawSuperStreamProducerConfig.RoutingStrategyType.set -> void
187+
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
188+
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.BoolFailure = 5 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
189+
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.ClosedByStrategyPolicy = 4 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
190+
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.ClosedByUser = 3 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
191+
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.MetaDataUpdate = 2 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
192+
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.None = 0 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
193+
RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.UnexpectedlyDisconnected = 1 -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
187194
RabbitMQ.Stream.Client.Reliable.Consumer.Info.get -> RabbitMQ.Stream.Client.ConsumerInfo
188195
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.get -> RabbitMQ.Stream.Client.ICrc32
189196
RabbitMQ.Stream.Client.Reliable.ConsumerConfig.Crc32.set -> void
@@ -209,7 +216,7 @@ RabbitMQ.Stream.Client.Reliable.ProducerConfig.Filter.set -> void
209216
RabbitMQ.Stream.Client.Reliable.ProducerConfig.Reference.set -> void
210217
RabbitMQ.Stream.Client.Reliable.ProducerFactory.CreateProducer(bool boot) -> System.Threading.Tasks.Task<RabbitMQ.Stream.Client.IProducer>
211218
RabbitMQ.Stream.Client.Reliable.ProducerFactory._producer -> RabbitMQ.Stream.Client.IProducer
212-
RabbitMQ.Stream.Client.Reliable.ReliableBase.UpdateStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus newStatus, string partition = null) -> void
219+
RabbitMQ.Stream.Client.Reliable.ReliableBase.UpdateStatus(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus newStatus, RabbitMQ.Stream.Client.Reliable.ChangeStatusReason reason, string partition = null) -> void
213220
RabbitMQ.Stream.Client.Reliable.ReliableBase._status -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
214221
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Identifier.get -> string
215222
RabbitMQ.Stream.Client.Reliable.ReliableConfig.Identifier.set -> void
@@ -220,19 +227,20 @@ RabbitMQ.Stream.Client.Reliable.ReliableConfig.ResourceAvailableReconnectStrateg
220227
RabbitMQ.Stream.Client.Reliable.ReliableConfig.StatusChanged -> RabbitMQ.Stream.Client.Reliable.ReliableConfig.StatusChangedHandler
221228
RabbitMQ.Stream.Client.Reliable.ReliableConfig.StatusChangedHandler
222229
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
223-
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Closed = 4 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
230+
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Closed = 3 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
224231
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Initialization = 0 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
225232
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Open = 1 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
226-
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.ReconnectionForMetaDataUpdate = 3 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
227-
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.ReconnectionForUnexpectedlyDisconnected = 2 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
233+
RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus.Reconnection = 2 -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
228234
RabbitMQ.Stream.Client.Reliable.StatusInfo
229235
RabbitMQ.Stream.Client.Reliable.StatusInfo.From.get -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus
230236
RabbitMQ.Stream.Client.Reliable.StatusInfo.From.init -> void
231237
RabbitMQ.Stream.Client.Reliable.StatusInfo.Identifier.get -> string
232238
RabbitMQ.Stream.Client.Reliable.StatusInfo.Identifier.init -> void
233239
RabbitMQ.Stream.Client.Reliable.StatusInfo.Partition.get -> string
234240
RabbitMQ.Stream.Client.Reliable.StatusInfo.Partition.init -> void
235-
RabbitMQ.Stream.Client.Reliable.StatusInfo.StatusInfo(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus From, RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus To, string Stream, string Identifier, string Partition) -> void
241+
RabbitMQ.Stream.Client.Reliable.StatusInfo.Reason.get -> RabbitMQ.Stream.Client.Reliable.ChangeStatusReason
242+
RabbitMQ.Stream.Client.Reliable.StatusInfo.Reason.init -> void
243+
RabbitMQ.Stream.Client.Reliable.StatusInfo.StatusInfo(RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus From, RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus To, string Stream, string Identifier, string Partition, RabbitMQ.Stream.Client.Reliable.ChangeStatusReason Reason = RabbitMQ.Stream.Client.Reliable.ChangeStatusReason.None) -> void
236244
RabbitMQ.Stream.Client.Reliable.StatusInfo.Stream.get -> string
237245
RabbitMQ.Stream.Client.Reliable.StatusInfo.Stream.init -> void
238246
RabbitMQ.Stream.Client.Reliable.StatusInfo.To.get -> RabbitMQ.Stream.Client.Reliable.ReliableEntityStatus

RabbitMQ.Stream.Client/Reliable/Consumer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,11 +204,11 @@ public override async Task Close()
204204
{
205205
if (_status == ReliableEntityStatus.Initialization)
206206
{
207-
UpdateStatus(ReliableEntityStatus.Closed);
207+
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser);
208208
return;
209209
}
210210

211-
UpdateStatus(ReliableEntityStatus.Closed);
211+
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser);
212212
await CloseEntity().ConfigureAwait(false);
213213
_logger?.LogDebug("Consumer {Identity} closed", ToString());
214214
}

RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,12 @@ private async Task<IConsumer> StandardConsumer(bool boot)
6868
}
6969

7070
await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
71-
ReliableEntityStatus.ReconnectionForUnexpectedlyDisconnected).ConfigureAwait(false);
71+
ChangeStatusReason.UnexpectedlyDisconnected).ConfigureAwait(false);
7272
},
7373
MetadataHandler = async _ =>
7474
{
7575
await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
76-
ReliableEntityStatus.ReconnectionForMetaDataUpdate).ConfigureAwait(false);
76+
ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false);
7777
},
7878
MessageHandler = async (consumer, ctx, message) =>
7979
{
@@ -137,15 +137,15 @@ private async Task<IConsumer> SuperConsumer(bool boot)
137137

138138
var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
139139
await OnEntityClosed(_consumerConfig.StreamSystem, partitionStream, r,
140-
ReliableEntityStatus.ReconnectionForUnexpectedlyDisconnected)
140+
ChangeStatusReason.UnexpectedlyDisconnected)
141141
.ConfigureAwait(false);
142142
},
143143
MetadataHandler = async update =>
144144
{
145145
await RandomWait().ConfigureAwait(false);
146146
var r = ((RawSuperStreamConsumer)(_consumer)).ReconnectPartition;
147147
await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r,
148-
ReliableEntityStatus.ReconnectionForMetaDataUpdate)
148+
ChangeStatusReason.MetaDataUpdate)
149149
.ConfigureAwait(false);
150150
},
151151
MessageHandler = async (partitionStream, consumer, ctx, message) =>

RabbitMQ.Stream.Client/Reliable/Producer.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,11 +201,11 @@ public override async Task Close()
201201
{
202202
if (ReliableEntityStatus.Initialization == _status)
203203
{
204-
UpdateStatus(ReliableEntityStatus.Closed);
204+
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser);
205205
return;
206206
}
207207

208-
UpdateStatus(ReliableEntityStatus.Closed);
208+
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByUser);
209209
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
210210
try
211211
{

RabbitMQ.Stream.Client/Reliable/ProducerFactory.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,15 @@ private async Task<IProducer> SuperStreamProducer(bool boot)
5656

5757
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
5858
await OnEntityClosed(_producerConfig.StreamSystem, partitionStream, r,
59-
ReliableEntityStatus.ReconnectionForUnexpectedlyDisconnected)
59+
ChangeStatusReason.UnexpectedlyDisconnected)
6060
.ConfigureAwait(false);
6161
},
6262
MetadataHandler = async update =>
6363
{
6464
await RandomWait().ConfigureAwait(false);
6565
var r = ((RawSuperStreamProducer)(_producer)).ReconnectPartition;
6666
await OnEntityClosed(_producerConfig.StreamSystem, update.Stream, r,
67-
ReliableEntityStatus.ReconnectionForMetaDataUpdate)
67+
ChangeStatusReason.MetaDataUpdate)
6868
.ConfigureAwait(false);
6969
},
7070
ConfirmHandler = confirmationHandler =>
@@ -106,7 +106,7 @@ private async Task<IProducer> StandardProducer()
106106
{
107107
await RandomWait().ConfigureAwait(false);
108108
await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream,
109-
ReliableEntityStatus.ReconnectionForMetaDataUpdate).ConfigureAwait(false);
109+
ChangeStatusReason.MetaDataUpdate).ConfigureAwait(false);
110110
},
111111
ConnectionClosedHandler = async (closeReason) =>
112112
{
@@ -118,7 +118,7 @@ await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream,
118118
}
119119

120120
await OnEntityClosed(_producerConfig.StreamSystem, _producerConfig.Stream,
121-
ReliableEntityStatus.ReconnectionForUnexpectedlyDisconnected).ConfigureAwait(false);
121+
ChangeStatusReason.UnexpectedlyDisconnected).ConfigureAwait(false);
122122
},
123123
ConfirmHandler = confirmation =>
124124
{

RabbitMQ.Stream.Client/Reliable/ReliableBase.cs

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,26 @@ namespace RabbitMQ.Stream.Client.Reliable;
1717
/// <param name="Stream"> Stream or SuperSuper affected</param>
1818
/// <param name="Identifier"> The Entity Identifier </param>
1919
/// <param name="Partition"> Super stream partition. Valid only for SuperStream else is empty</param>
20+
/// <param name="Reason"> The reason why the status changed </param>
2021
public record StatusInfo(
21-
ReliableEntityStatus From,
22-
ReliableEntityStatus To,
22+
ReliableEntityStatus From, // init
23+
ReliableEntityStatus To, // open
2324
string Stream,
2425
string Identifier,
25-
string Partition
26+
string Partition,
27+
ChangeStatusReason Reason = ChangeStatusReason.None
2628
);
2729

30+
public enum ChangeStatusReason
31+
{
32+
None,
33+
UnexpectedlyDisconnected,
34+
MetaDataUpdate,
35+
ClosedByUser,
36+
ClosedByStrategyPolicy,
37+
BoolFailure
38+
}
39+
2840
public record ReliableConfig
2941
{
3042
/// <summary>
@@ -81,8 +93,7 @@ public enum ReliableEntityStatus
8193
{
8294
Initialization, // the entity is initializing
8395
Open, // the entity is open and ready to use
84-
ReconnectionForUnexpectedlyDisconnected, // the entity is disconnected in an unexpected way but still considered open
85-
ReconnectionForMetaDataUpdate, // the entity is disconnected because the stream topology has changed but still considered open
96+
Reconnection, // the entity is in reconnection but it is still considered open
8697
Closed, // the entity is closed and cannot be used anymore
8798
}
8899

@@ -103,7 +114,8 @@ protected static async Task RandomWait()
103114
await Task.Delay(Consts.RandomMid()).ConfigureAwait(false);
104115
}
105116

106-
protected void UpdateStatus(ReliableEntityStatus newStatus, string partition = null)
117+
protected void UpdateStatus(ReliableEntityStatus newStatus,
118+
ChangeStatusReason reason, string partition = null)
107119
{
108120
var oldStatus = _status;
109121
lock (_lock)
@@ -113,7 +125,7 @@ protected void UpdateStatus(ReliableEntityStatus newStatus, string partition = n
113125
{
114126
_reliableConfig.OnStatusChanged(new StatusInfo(oldStatus, newStatus,
115127
_reliableConfig.Stream,
116-
_reliableConfig.Identifier, partition));
128+
_reliableConfig.Identifier, partition, reason));
117129
}
118130
}
119131
}
@@ -150,7 +162,7 @@ private async Task MaybeInit(bool boot)
150162
// else there are two ways:
151163
// - the exception is a known exception and the client will try to reconnect
152164
// - the exception is not a known exception and the client will throw the exception
153-
UpdateStatus(ReliableEntityStatus.Open);
165+
UpdateStatus(ReliableEntityStatus.Open, ChangeStatusReason.None);
154166
}
155167
catch (Exception e)
156168
{
@@ -159,7 +171,7 @@ private async Task MaybeInit(bool boot)
159171
BaseLogger.LogError("{Identity} Error during the first boot {EMessage}",
160172
ToString(), e.Message);
161173
// if it is the first boot we don't need to reconnect
162-
UpdateStatus(ReliableEntityStatus.Closed);
174+
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.BoolFailure);
163175
throw;
164176
}
165177

@@ -188,7 +200,7 @@ private async Task Init(bool boot)
188200
// each time that the client is initialized, we need to reset the status
189201
// if we hare here it means that the entity is not open for some reason like:
190202
// first time initialization or reconnect due of a IsAKnownException
191-
UpdateStatus(ReliableEntityStatus.Initialization);
203+
UpdateStatus(ReliableEntityStatus.Initialization, ChangeStatusReason.None);
192204

193205
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
194206
try
@@ -275,7 +287,7 @@ private async Task MaybeReconnect()
275287
if (!reconnect)
276288
{
277289
BaseLogger.LogDebug("{Identity} is closed due of reconnect strategy", ToString());
278-
UpdateStatus(ReliableEntityStatus.Closed);
290+
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByStrategyPolicy);
279291
return;
280292
}
281293

@@ -285,8 +297,7 @@ private async Task MaybeReconnect()
285297
await MaybeInit(false).ConfigureAwait(false);
286298
break;
287299
case false:
288-
if (CompareStatus(ReliableEntityStatus.ReconnectionForMetaDataUpdate) ||
289-
CompareStatus(ReliableEntityStatus.ReconnectionForUnexpectedlyDisconnected))
300+
if (CompareStatus(ReliableEntityStatus.Reconnection))
290301
{
291302
BaseLogger.LogDebug("{Identity} is in Reconnecting", ToString());
292303
}
@@ -304,14 +315,14 @@ private async Task MaybeReconnectPartition(StreamInfo streamInfo, string info,
304315
if (!reconnect)
305316
{
306317
BaseLogger.LogDebug("{Identity} partition is closed due of reconnect strategy", ToString());
307-
UpdateStatus(ReliableEntityStatus.Closed, streamInfo.Stream);
318+
UpdateStatus(ReliableEntityStatus.Closed, ChangeStatusReason.ClosedByStrategyPolicy, streamInfo.Stream);
308319
return;
309320
}
310321

311322
try
312323
{
313324
await reconnectPartitionFunc(streamInfo).ConfigureAwait(false);
314-
UpdateStatus(ReliableEntityStatus.Open, streamInfo.Stream);
325+
UpdateStatus(ReliableEntityStatus.Open, ChangeStatusReason.None, streamInfo.Stream);
315326
await _reliableConfig.ReconnectStrategy.WhenConnected(
316327
$"Super Stream partition: {streamInfo.Stream} for {info}").ConfigureAwait(false);
317328
}
@@ -353,11 +364,12 @@ private void LogException(Exception exception)
353364
/// <param name="reconnectPartitionFunc">Function to reconnect the partition</param>
354365
/// <param name="reason">The reason why the connection is closed (Metadata update od disconnection)</param>
355366
internal async Task OnEntityClosed(StreamSystem system, string stream,
356-
Func<StreamInfo, Task> reconnectPartitionFunc, ReliableEntityStatus reason)
367+
Func<StreamInfo, Task> reconnectPartitionFunc, ChangeStatusReason reason)
357368
{
358369
var streamExists = false;
359370
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
360-
UpdateStatus(reason, stream);
371+
UpdateStatus(ReliableEntityStatus.Reconnection, reason,
372+
stream);
361373
try
362374
{
363375
streamExists = await CheckIfStreamIsAvailable(stream, system)
@@ -385,11 +397,11 @@ internal async Task OnEntityClosed(StreamSystem system, string stream,
385397
/// <param name="system">Stream system</param>
386398
/// <param name="stream">Stream</param>
387399
/// <param name="reason">The reason why the connection is closed (Metadata update od disconnection)</param>
388-
internal async Task OnEntityClosed(StreamSystem system, string stream, ReliableEntityStatus reason)
400+
internal async Task OnEntityClosed(StreamSystem system, string stream, ChangeStatusReason reason)
389401
{
390402
var streamExists = false;
391403
await SemaphoreSlim.WaitAsync().ConfigureAwait(false);
392-
UpdateStatus(reason, stream);
404+
UpdateStatus(ReliableEntityStatus.Reconnection, reason, stream);
393405
try
394406
{
395407
streamExists = await CheckIfStreamIsAvailable(stream, system)

0 commit comments

Comments
 (0)