Skip to content

Commit bca7e00

Browse files
committed
fix connection name
Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent c406666 commit bca7e00

File tree

3 files changed

+36
-19
lines changed

3 files changed

+36
-19
lines changed

RabbitMQ.Stream.Client/RoutingClient.cs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,15 +67,23 @@ internal static async Task<IClient> LookupConnection(
6767
// In this case we just return the node (leader for producer, random for consumer)
6868
// since there is not load balancer configuration
6969

70-
return await routing.CreateClient(clientParameters with { Endpoint = endPointNoLb }, broker, logger)
70+
return await routing.CreateClient(clientParameters with
71+
{
72+
Endpoint = endPointNoLb,
73+
ClientProvidedName = clientParameters.ClientProvidedName
74+
}, broker, logger)
7175
.ConfigureAwait(false);
7276
}
7377

7478
// here it means that there is a AddressResolver configuration
7579
// so there is a load-balancer or proxy we need to get the right connection
7680
// as first we try with the first node given from the LB
7781
var endPoint = clientParameters.AddressResolver.EndPoint;
78-
var client = await routing.CreateClient(clientParameters with { Endpoint = endPoint }, broker, logger)
82+
var client = await routing.CreateClient(clientParameters with
83+
{
84+
Endpoint = endPoint,
85+
ClientProvidedName = clientParameters.ClientProvidedName
86+
}, broker, logger)
7987
.ConfigureAwait(false);
8088

8189
var advertisedHost = GetPropertyValue(client.ConnectionProperties, "advertised_host");
@@ -87,7 +95,11 @@ internal static async Task<IClient> LookupConnection(
8795
attemptNo++;
8896
await client.Close("advertised_host or advertised_port doesn't match").ConfigureAwait(false);
8997

90-
client = await routing.CreateClient(clientParameters with { Endpoint = endPoint }, broker, logger)
98+
client = await routing.CreateClient(clientParameters with
99+
{
100+
Endpoint = endPoint,
101+
ClientProvidedName = clientParameters.ClientProvidedName
102+
}, broker, logger)
91103
.ConfigureAwait(false);
92104

93105
advertisedHost = GetPropertyValue(client.ConnectionProperties, "advertised_host");

docs/ReliableClient/Program.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@
1010
ProducersPerConnection = 2,
1111
ConsumersPerConnection = 2,
1212
Host = "Node0",
13-
Port = 5562,
14-
// LoadBalancer = true,
13+
Port = 5553,
14+
LoadBalancer = true,
1515
SuperStream = true,
16-
Streams = 1,
17-
Producers = 2,
16+
Streams = 3,
17+
Producers = 4,
1818
MessagesPerProducer = 50_000_000,
19-
Consumers = 3
19+
Consumers = 4
2020
// Username = "test",
2121
// Password = "test"
2222
});

docs/ReliableClient/RClient.cs

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public static async Task Start(Config config)
4848
var loggerFactory = serviceCollection.BuildServiceProvider()
4949
.GetService<ILoggerFactory>();
5050

51+
5152
if (loggerFactory != null)
5253
{
5354
var lp = loggerFactory.CreateLogger<Producer>();
@@ -84,7 +85,7 @@ public static async Task Start(Config config)
8485

8586
if (config.LoadBalancer)
8687
{
87-
var resolver = new AddressResolver(new IPEndPoint(IPAddress.Parse(config.Host), config.Port));
88+
var resolver = new AddressResolver(ep);
8889
streamConf = new StreamSystemConfig()
8990
{
9091
AddressResolver = resolver,
@@ -114,7 +115,6 @@ public static async Task Start(Config config)
114115
var totalError = 0;
115116
var totalConsumed = 0;
116117
var totalSent = 0;
117-
var totalSentToSuperStream = 0;
118118
var isRunning = true;
119119

120120
_ = Task.Run(() =>
@@ -124,12 +124,11 @@ public static async Task Start(Config config)
124124
Console.WriteLine(
125125
$"When: {DateTime.Now}, " +
126126
$"Tr {System.Diagnostics.Process.GetCurrentProcess().Threads.Count}, " +
127+
$"Sent: {totalSent:#,##0.00}, " +
127128
$"Conf: {totalConfirmed:#,##0.00}, " +
128129
$"Error: {totalError:#,##0.00}, " +
129130
$"Total: {(totalConfirmed + totalError):#,##0.00}, " +
130131
$"Consumed: {totalConsumed:#,##0.00}, " +
131-
$"Sent: {totalSent:#,##0.00}, " +
132-
$"Sent To SuperStream: {totalSentToSuperStream:#,##0.00}, " +
133132
$"Sent per stream: {totalSent / streamsList.Count}");
134133
Thread.Sleep(5000);
135134
}
@@ -180,6 +179,11 @@ await system.CreateStream(new StreamSpec(stream) {MaxLengthBytes = 30_000_000_00
180179
await Consumer.Create(conf, lc).ConfigureAwait(false));
181180
}
182181

182+
async Task MaybeSend(Producer producer, Message message, ManualResetEvent publishEvent)
183+
{
184+
publishEvent.WaitOne();
185+
await producer.Send(message).ConfigureAwait(false);
186+
}
183187

184188
for (var z = 0; z < config.Producers; z++)
185189
{
@@ -215,12 +219,12 @@ await system.CreateStream(new StreamSpec(stream) {MaxLengthBytes = 30_000_000_00
215219
var streamInfo = status.Partition is not null
216220
? $" Partition {status.Partition} of super stream: {status.Stream}"
217221
: $"Stream: {status.Stream}";
218-
222+
219223
lp.LogInformation("Producer: {Id} - status changed from {From} to {To}. {Info}",
220-
status.Identifier,
224+
status.Identifier,
221225
status.From,
222226
status.To, streamInfo);
223-
227+
224228
if (status.To == ReliableEntityStatus.Open)
225229
{
226230
publishEvent.Set();
@@ -238,14 +242,13 @@ await system.CreateStream(new StreamSpec(stream) {MaxLengthBytes = 30_000_000_00
238242

239243
for (var i = 0; i < config.MessagesPerProducer; i++)
240244
{
241-
publishEvent.WaitOne();
242245
if (!unconfirmedMessages.IsEmpty)
243246
{
244247
var msgs = unconfirmedMessages.ToArray();
245248
unconfirmedMessages.Clear();
246249
foreach (var msg in msgs)
247250
{
248-
await producer.Send(msg).ConfigureAwait(false);
251+
await MaybeSend(producer, msg, publishEvent).ConfigureAwait(false);
249252
Interlocked.Increment(ref totalSent);
250253
}
251254
}
@@ -254,8 +257,7 @@ await system.CreateStream(new StreamSpec(stream) {MaxLengthBytes = 30_000_000_00
254257
{
255258
Properties = new Properties() {MessageId = $"hello{i}"}
256259
};
257-
await producer.Send(message).ConfigureAwait(false);
258-
await Task.Delay(10).ConfigureAwait(false);
260+
await MaybeSend(producer, message, publishEvent).ConfigureAwait(false);
259261
Interlocked.Increment(ref totalSent);
260262
}
261263
});
@@ -270,6 +272,9 @@ await system.CreateStream(new StreamSpec(stream) {MaxLengthBytes = 30_000_000_00
270272
producersList.ForEach(async p => await p.Close().ConfigureAwait(false));
271273
Console.WriteLine("closing the consumers ..... ");
272274
consumersList.ForEach(async c => await c.Close().ConfigureAwait(false));
275+
276+
Console.WriteLine("Press any key to close all");
277+
Console.ReadKey();
273278
}
274279

275280
Console.WriteLine("Closed all the consumers and producers");

0 commit comments

Comments
 (0)