Skip to content

Commit 847d0c6

Browse files
Oleh DokukaOlegDokuka
authored andcommitted
adds tests for WeightedLoadbalanceStrategy
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 16fdb87 commit 847d0c6

File tree

4 files changed

+260
-2
lines changed

4 files changed

+260
-2
lines changed

rsocket-core/src/main/java/io/rsocket/loadbalance/FrugalQuantile.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ public synchronized void insert(double x) {
6565
estimate = x;
6666
sign = 1;
6767
} else {
68-
double v = rnd.nextDouble();
68+
final double v = rnd.nextDouble();
69+
final double estimate = this.estimate;
6970

7071
if (x > estimate && v > (1 - quantile)) {
7172
higher(x);
@@ -76,6 +77,8 @@ public synchronized void insert(double x) {
7677
}
7778

7879
private void higher(double x) {
80+
double estimate = this.estimate;
81+
7982
step += sign * increment;
8083

8184
if (step > 0) {
@@ -94,9 +97,13 @@ private void higher(double x) {
9497
}
9598

9699
sign = 1;
100+
101+
this.estimate = estimate;
97102
}
98103

99104
private void lower(double x) {
105+
double estimate = this.estimate;
106+
100107
step -= sign * increment;
101108

102109
if (step > 0) {
@@ -115,6 +122,8 @@ private void lower(double x) {
115122
}
116123

117124
sign = -1;
125+
126+
this.estimate = estimate;
118127
}
119128

120129
@Override

rsocket-core/src/main/java/io/rsocket/loadbalance/Median.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public synchronized void insert(double x) {
3333
estimate = x;
3434
sign = 1;
3535
} else {
36+
final double estimate = this.estimate;
3637
if (x > estimate) {
3738
greaterThanZero(x);
3839
} else if (x < estimate) {
@@ -42,6 +43,8 @@ public synchronized void insert(double x) {
4243
}
4344

4445
private void greaterThanZero(double x) {
46+
double estimate = this.estimate;
47+
4548
step += sign;
4649

4750
if (step > 0) {
@@ -60,9 +63,13 @@ private void greaterThanZero(double x) {
6063
}
6164

6265
sign = 1;
66+
67+
this.estimate = estimate;
6368
}
6469

6570
private void lessThanZero(double x) {
71+
double estimate = this.estimate;
72+
6673
step -= sign;
6774

6875
if (step > 0) {
@@ -81,6 +88,8 @@ private void lessThanZero(double x) {
8188
}
8289

8390
sign = -1;
91+
92+
this.estimate = estimate;
8493
}
8594

8695
@Override

rsocket-core/src/main/java/io/rsocket/loadbalance/WeightedLoadbalanceStrategy.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,10 @@ public RSocket select(List<RSocket> sockets) {
124124

125125
private static double algorithmicWeight(
126126
RSocket rSocket, @Nullable final WeightedStats weightedStats) {
127-
if (weightedStats == null || rSocket.isDisposed() || rSocket.availability() == 0.0) {
127+
if (weightedStats == null) {
128+
return 1.0;
129+
}
130+
if (rSocket.isDisposed() || rSocket.availability() == 0.0) {
128131
return 0.0;
129132
}
130133
final int pending = weightedStats.pending();
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
package io.rsocket.loadbalance;
2+
3+
import io.rsocket.Payload;
4+
import io.rsocket.RSocket;
5+
import io.rsocket.RaceTestConstants;
6+
import io.rsocket.core.RSocketConnector;
7+
import io.rsocket.transport.ClientTransport;
8+
import io.rsocket.util.Clock;
9+
import io.rsocket.util.EmptyPayload;
10+
import java.util.Arrays;
11+
import java.util.Collections;
12+
import java.util.List;
13+
import java.util.concurrent.atomic.AtomicInteger;
14+
import org.assertj.core.api.Assertions;
15+
import org.assertj.core.data.Offset;
16+
import org.junit.jupiter.api.AfterAll;
17+
import org.junit.jupiter.api.BeforeEach;
18+
import org.junit.jupiter.api.Test;
19+
import org.mockito.Mockito;
20+
import reactor.core.publisher.Hooks;
21+
import reactor.core.publisher.Mono;
22+
import reactor.core.publisher.Sinks;
23+
import reactor.test.publisher.TestPublisher;
24+
25+
public class WeightedLoadbalanceStrategyTest {
26+
27+
@BeforeEach
28+
void setUp() {
29+
Hooks.onErrorDropped((__) -> {});
30+
}
31+
32+
@AfterAll
33+
static void afterAll() {
34+
Hooks.resetOnErrorDropped();
35+
}
36+
37+
@Test
38+
public void allRequestsShouldGoToTheSocketWithHigherWeight() {
39+
final AtomicInteger counter1 = new AtomicInteger();
40+
final AtomicInteger counter2 = new AtomicInteger();
41+
final ClientTransport mockTransport = Mockito.mock(ClientTransport.class);
42+
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
43+
final WeightedTestRSocket rSocket1 =
44+
new WeightedTestRSocket(
45+
new RSocket() {
46+
@Override
47+
public Mono<Void> fireAndForget(Payload payload) {
48+
counter1.incrementAndGet();
49+
return Mono.empty();
50+
}
51+
});
52+
final WeightedTestRSocket rSocket2 =
53+
new WeightedTestRSocket(
54+
new RSocket() {
55+
@Override
56+
public Mono<Void> fireAndForget(Payload payload) {
57+
counter2.incrementAndGet();
58+
return Mono.empty();
59+
}
60+
});
61+
Mockito.when(rSocketConnectorMock.connect(Mockito.any(ClientTransport.class)))
62+
.then(im -> Mono.just(rSocket1))
63+
.then(im -> Mono.just(rSocket2));
64+
65+
final TestPublisher<List<LoadbalanceTarget>> source = TestPublisher.create();
66+
final RSocketPool rSocketPool =
67+
new RSocketPool(
68+
rSocketConnectorMock,
69+
source,
70+
WeightedLoadbalanceStrategy.builder()
71+
.weightedStatsResolver(r -> r instanceof WeightedStats ? (WeightedStats) r : null)
72+
.build());
73+
74+
for (int j = 0; j < RaceTestConstants.REPEATS; j++) {
75+
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
76+
}
77+
78+
source.next(
79+
Arrays.asList(
80+
LoadbalanceTarget.from("1", mockTransport),
81+
LoadbalanceTarget.from("2", mockTransport)));
82+
83+
Assertions.assertThat(counter1.get()).isCloseTo(1000, Offset.offset(1));
84+
Assertions.assertThat(counter2.get()).isCloseTo(0, Offset.offset(1));
85+
}
86+
87+
@Test
88+
public void shouldDeliverValuesToTheSocketWithTheHighestCalculatedWeight() {
89+
final AtomicInteger counter1 = new AtomicInteger();
90+
final AtomicInteger counter2 = new AtomicInteger();
91+
final ClientTransport mockTransport1 = Mockito.mock(ClientTransport.class);
92+
final ClientTransport mockTransport2 = Mockito.mock(ClientTransport.class);
93+
final RSocketConnector rSocketConnectorMock = Mockito.mock(RSocketConnector.class);
94+
final WeightedTestRSocket rSocket1 =
95+
new WeightedTestRSocket(
96+
new RSocket() {
97+
@Override
98+
public Mono<Void> fireAndForget(Payload payload) {
99+
counter1.incrementAndGet();
100+
return Mono.empty();
101+
}
102+
});
103+
final WeightedTestRSocket rSocket2 =
104+
new WeightedTestRSocket(
105+
new RSocket() {
106+
@Override
107+
public Mono<Void> fireAndForget(Payload payload) {
108+
counter1.incrementAndGet();
109+
return Mono.empty();
110+
}
111+
});
112+
final WeightedTestRSocket rSocket3 =
113+
new WeightedTestRSocket(
114+
new RSocket() {
115+
@Override
116+
public Mono<Void> fireAndForget(Payload payload) {
117+
counter2.incrementAndGet();
118+
return Mono.empty();
119+
}
120+
});
121+
122+
Mockito.when(rSocketConnectorMock.connect(Mockito.any(ClientTransport.class)))
123+
.then(im -> Mono.just(rSocket1))
124+
.then(im -> Mono.just(rSocket2))
125+
.then(im -> Mono.just(rSocket3));
126+
127+
final TestPublisher<List<LoadbalanceTarget>> source = TestPublisher.create();
128+
final RSocketPool rSocketPool =
129+
new RSocketPool(
130+
rSocketConnectorMock,
131+
source,
132+
WeightedLoadbalanceStrategy.builder()
133+
.weightedStatsResolver(r -> r instanceof WeightedStats ? (WeightedStats) r : null)
134+
.build());
135+
136+
for (int j = 0; j < RaceTestConstants.REPEATS; j++) {
137+
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
138+
}
139+
140+
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport1)));
141+
142+
Assertions.assertThat(counter1.get()).isCloseTo(RaceTestConstants.REPEATS, Offset.offset(1));
143+
144+
source.next(Collections.emptyList());
145+
146+
for (int j = 0; j < RaceTestConstants.REPEATS; j++) {
147+
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
148+
}
149+
150+
rSocket1.updateAvailability(0.0);
151+
152+
source.next(Collections.singletonList(LoadbalanceTarget.from("1", mockTransport1)));
153+
154+
Assertions.assertThat(counter1.get())
155+
.isCloseTo(RaceTestConstants.REPEATS * 2, Offset.offset(1));
156+
157+
source.next(
158+
Arrays.asList(
159+
LoadbalanceTarget.from("1", mockTransport1),
160+
LoadbalanceTarget.from("2", mockTransport2)));
161+
162+
for (int j = 0; j < RaceTestConstants.REPEATS; j++) {
163+
final RSocket rSocket = rSocketPool.select();
164+
rSocket.fireAndForget(EmptyPayload.INSTANCE).subscribe();
165+
}
166+
167+
Assertions.assertThat(counter1.get())
168+
.isCloseTo(RaceTestConstants.REPEATS * 3, Offset.offset(100));
169+
Assertions.assertThat(counter2.get()).isCloseTo(0, Offset.offset(100));
170+
171+
rSocket2.updateAvailability(0.0);
172+
173+
source.next(Collections.singletonList(LoadbalanceTarget.from("2", mockTransport1)));
174+
175+
for (int j = 0; j < RaceTestConstants.REPEATS; j++) {
176+
rSocketPool.select().fireAndForget(EmptyPayload.INSTANCE).subscribe();
177+
}
178+
179+
Assertions.assertThat(counter1.get())
180+
.isCloseTo(RaceTestConstants.REPEATS * 3, Offset.offset(100));
181+
Assertions.assertThat(counter2.get()).isCloseTo(RaceTestConstants.REPEATS, Offset.offset(100));
182+
183+
source.next(
184+
Arrays.asList(
185+
LoadbalanceTarget.from("1", mockTransport1),
186+
LoadbalanceTarget.from("2", mockTransport2)));
187+
188+
for (int j = 0; j < RaceTestConstants.REPEATS; j++) {
189+
final RSocket rSocket = rSocketPool.select();
190+
rSocket.fireAndForget(EmptyPayload.INSTANCE).subscribe();
191+
}
192+
193+
Assertions.assertThat(counter1.get())
194+
.isCloseTo(RaceTestConstants.REPEATS * 3, Offset.offset(100));
195+
Assertions.assertThat(counter2.get())
196+
.isCloseTo(RaceTestConstants.REPEATS * 2, Offset.offset(100));
197+
}
198+
199+
static class WeightedTestRSocket extends BaseWeightedStats implements RSocket {
200+
201+
final Sinks.Empty<Void> sink = Sinks.empty();
202+
203+
final RSocket rSocket;
204+
205+
public WeightedTestRSocket(RSocket rSocket) {
206+
this.rSocket = rSocket;
207+
}
208+
209+
@Override
210+
public Mono<Void> fireAndForget(Payload payload) {
211+
startRequest();
212+
final long startTime = Clock.now();
213+
return this.rSocket
214+
.fireAndForget(payload)
215+
.doFinally(
216+
__ -> {
217+
stopRequest(startTime);
218+
record(Clock.now() - startTime);
219+
updateAvailability(1.0);
220+
});
221+
}
222+
223+
@Override
224+
public Mono<Void> onClose() {
225+
return sink.asMono();
226+
}
227+
228+
@Override
229+
public void dispose() {
230+
sink.tryEmitEmpty();
231+
}
232+
233+
public RSocket source() {
234+
return rSocket;
235+
}
236+
}
237+
}

0 commit comments

Comments
 (0)