Skip to content

Commit 7e6330d

Browse files
committed
improves tests
Signed-off-by: Oleh Dokuka <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 0601f88 commit 7e6330d

File tree

2 files changed

+154
-86
lines changed

2 files changed

+154
-86
lines changed
Lines changed: 125 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2015-2018 the original author or authors.
2+
* Copyright 2015-present the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,115 +16,156 @@
1616

1717
package io.rsocket.internal;
1818

19-
import io.rsocket.Payload;
20-
import io.rsocket.util.ByteBufPayload;
21-
import io.rsocket.util.EmptyPayload;
22-
import java.util.concurrent.CountDownLatch;
23-
import org.junit.Assert;
24-
import org.junit.Test;
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import io.netty.buffer.ByteBuf;
22+
import io.netty.buffer.ByteBufAllocator;
23+
import io.netty.buffer.Unpooled;
24+
import io.netty.util.CharsetUtil;
25+
import io.netty.util.ReferenceCountUtil;
26+
import io.rsocket.buffer.LeaksTrackingByteBufAllocator;
27+
import io.rsocket.internal.subscriber.AssertSubscriber;
28+
import org.junit.jupiter.api.RepeatedTest;
29+
import org.junit.jupiter.api.Timeout;
30+
import org.junit.jupiter.params.ParameterizedTest;
31+
import org.junit.jupiter.params.provider.ValueSource;
32+
import reactor.core.Fuseable;
33+
import reactor.core.publisher.Operators;
34+
import reactor.core.scheduler.Schedulers;
35+
import reactor.test.StepVerifier;
36+
import reactor.test.util.RaceTestUtils;
2537

2638
public class UnboundedProcessorTest {
27-
@Test
28-
public void testOnNextBeforeSubscribe_10() {
29-
testOnNextBeforeSubscribeN(10);
30-
}
31-
32-
@Test
33-
public void testOnNextBeforeSubscribe_100() {
34-
testOnNextBeforeSubscribeN(100);
35-
}
36-
37-
@Test
38-
public void testOnNextBeforeSubscribe_10_000() {
39-
testOnNextBeforeSubscribeN(10_000);
40-
}
41-
42-
@Test
43-
public void testOnNextBeforeSubscribe_100_000() {
44-
testOnNextBeforeSubscribeN(100_000);
45-
}
46-
47-
@Test
48-
public void testOnNextBeforeSubscribe_1_000_000() {
49-
testOnNextBeforeSubscribeN(1_000_000);
50-
}
51-
52-
@Test
53-
public void testOnNextBeforeSubscribe_10_000_000() {
54-
testOnNextBeforeSubscribeN(10_000_000);
55-
}
5639

40+
@ParameterizedTest(
41+
name =
42+
"Test that emitting {0} onNext before subscribe and requestN should deliver all the signals once the subscriber is available")
43+
@ValueSource(ints = {10, 100, 10_000, 100_000, 1_000_000, 10_000_000})
5744
public void testOnNextBeforeSubscribeN(int n) {
58-
UnboundedProcessor<Payload> processor = new UnboundedProcessor<>();
45+
UnboundedProcessor<ByteBuf> processor = new UnboundedProcessor<>();
5946

6047
for (int i = 0; i < n; i++) {
61-
processor.onNext(EmptyPayload.INSTANCE);
48+
processor.onNext(Unpooled.EMPTY_BUFFER);
6249
}
6350

6451
processor.onComplete();
6552

66-
long count = processor.count().block();
67-
68-
Assert.assertEquals(n, count);
69-
}
70-
71-
@Test
72-
public void testOnNextAfterSubscribe_10() throws Exception {
73-
testOnNextAfterSubscribeN(10);
74-
}
75-
76-
@Test
77-
public void testOnNextAfterSubscribe_100() throws Exception {
78-
testOnNextAfterSubscribeN(100);
53+
StepVerifier.create(processor.count()).expectNext(Long.valueOf(n)).verifyComplete();
7954
}
8055

81-
@Test
82-
public void testOnNextAfterSubscribe_1000() throws Exception {
83-
testOnNextAfterSubscribeN(1000);
84-
}
56+
@ParameterizedTest(
57+
name =
58+
"Test that emitting {0} onNext after subscribe and requestN should deliver all the signals")
59+
@ValueSource(ints = {10, 100, 10_000})
60+
public void testOnNextAfterSubscribeN(int n) {
61+
UnboundedProcessor<ByteBuf> processor = new UnboundedProcessor<>();
62+
AssertSubscriber<ByteBuf> assertSubscriber = AssertSubscriber.create();
8563

86-
@Test
87-
public void testPrioritizedSending() {
88-
UnboundedProcessor<Payload> processor = new UnboundedProcessor<>();
64+
processor.subscribe(assertSubscriber);
8965

90-
for (int i = 0; i < 1000; i++) {
91-
processor.onNext(EmptyPayload.INSTANCE);
66+
for (int i = 0; i < n; i++) {
67+
processor.onNext(Unpooled.EMPTY_BUFFER);
9268
}
9369

94-
processor.onNextPrioritized(ByteBufPayload.create("test"));
95-
96-
Payload closestPayload = processor.next().block();
97-
98-
Assert.assertEquals(closestPayload.getDataUtf8(), "test");
70+
assertSubscriber.awaitAndAssertNextValueCount(n);
9971
}
10072

101-
@Test
102-
public void testPrioritizedFused() {
103-
UnboundedProcessor<Payload> processor = new UnboundedProcessor<>();
73+
@ParameterizedTest(
74+
name =
75+
"Test that prioritized value sending deliver prioritized signals before the others mode[fusionEnabled={0}]")
76+
@ValueSource(booleans = {true, false})
77+
public void testPrioritizedSending(boolean fusedCase) {
78+
UnboundedProcessor<ByteBuf> processor = new UnboundedProcessor<>();
10479

10580
for (int i = 0; i < 1000; i++) {
106-
processor.onNext(EmptyPayload.INSTANCE);
81+
processor.onNext(Unpooled.EMPTY_BUFFER);
10782
}
10883

109-
processor.onNextPrioritized(ByteBufPayload.create("test"));
110-
111-
Payload closestPayload = processor.poll();
84+
processor.onNextPrioritized(Unpooled.copiedBuffer("test", CharsetUtil.UTF_8));
11285

113-
Assert.assertEquals(closestPayload.getDataUtf8(), "test");
86+
assertThat(fusedCase ? processor.poll() : processor.next().block())
87+
.isNotNull()
88+
.extracting(bb -> bb.toString(CharsetUtil.UTF_8))
89+
.isEqualTo("test");
11490
}
11591

116-
public void testOnNextAfterSubscribeN(int n) throws Exception {
117-
CountDownLatch latch = new CountDownLatch(n);
118-
UnboundedProcessor<Payload> processor = new UnboundedProcessor<>();
119-
processor.log().doOnNext(integer -> latch.countDown()).subscribe();
120-
121-
for (int i = 0; i < n; i++) {
122-
System.out.println("onNexting -> " + i);
123-
processor.onNext(EmptyPayload.INSTANCE);
92+
@ParameterizedTest(
93+
name =
94+
"Ensures that racing between onNext | dispose | cancel | request(n) will not cause any issues and leaks; mode[fusionEnabled={0}]")
95+
@ValueSource(booleans = {true, false})
96+
public void ensureUnboundedProcessorDisposesQueueProperly(boolean withFusionEnabled) {
97+
final LeaksTrackingByteBufAllocator allocator =
98+
LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
99+
for (int i = 0; i < 10000000; i++) {
100+
final UnboundedProcessor<ByteBuf> unboundedProcessor = new UnboundedProcessor<>();
101+
102+
final ByteBuf buffer1 = allocator.buffer(1);
103+
final ByteBuf buffer2 = allocator.buffer(2);
104+
105+
final AssertSubscriber<ByteBuf> assertSubscriber =
106+
new AssertSubscriber<ByteBuf>(0)
107+
.requestedFusionMode(withFusionEnabled ? Fuseable.ANY : Fuseable.NONE);
108+
109+
unboundedProcessor.subscribe(assertSubscriber);
110+
111+
RaceTestUtils.race(
112+
() ->
113+
RaceTestUtils.race(
114+
() ->
115+
RaceTestUtils.race(
116+
() -> {
117+
unboundedProcessor.onNext(buffer1);
118+
unboundedProcessor.onNext(buffer2);
119+
},
120+
unboundedProcessor::dispose,
121+
Schedulers.elastic()),
122+
assertSubscriber::cancel,
123+
Schedulers.elastic()),
124+
() -> {
125+
assertSubscriber.request(1);
126+
assertSubscriber.request(1);
127+
},
128+
Schedulers.elastic());
129+
130+
assertSubscriber.values().forEach(ReferenceCountUtil::safeRelease);
131+
132+
allocator.assertHasNoLeaks();
124133
}
134+
}
125135

126-
processor.drain();
127-
128-
latch.await();
136+
@RepeatedTest(
137+
name =
138+
"Ensures that racing between onNext | dispose | cancel | request(n) will not cause any issues and leaks",
139+
value = 100000)
140+
@Timeout(10)
141+
public void ensureUnboundedProcessorDisposesQueueProperlyAsyncMode() {
142+
final LeaksTrackingByteBufAllocator allocator =
143+
LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
144+
final UnboundedProcessor<ByteBuf> unboundedProcessor = new UnboundedProcessor<>();
145+
146+
final ByteBuf buffer1 = allocator.buffer(1);
147+
final ByteBuf buffer2 = allocator.buffer(2);
148+
149+
final AssertSubscriber<ByteBuf> assertSubscriber =
150+
new AssertSubscriber<>(Operators.enableOnDiscard(null, ReferenceCountUtil::safeRelease));
151+
152+
unboundedProcessor.publishOn(Schedulers.parallel()).subscribe(assertSubscriber);
153+
154+
RaceTestUtils.race(
155+
() -> {
156+
unboundedProcessor.onNext(buffer1);
157+
unboundedProcessor.onNext(buffer2);
158+
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
159+
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
160+
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
161+
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
162+
unboundedProcessor.dispose();
163+
},
164+
unboundedProcessor::dispose,
165+
Schedulers.elastic());
166+
167+
assertSubscriber.values().forEach(ReferenceCountUtil::safeRelease);
168+
169+
allocator.assertHasNoLeaks();
129170
}
130171
}

rsocket-core/src/test/java/io/rsocket/internal/subscriber/AssertSubscriber.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.Set;
2727
import java.util.concurrent.CountDownLatch;
2828
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2930
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
3031
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
3132
import java.util.function.BooleanSupplier;
@@ -86,6 +87,10 @@ public class AssertSubscriber<T> implements CoreSubscriber<T>, Subscription {
8687
private static final AtomicLongFieldUpdater<AssertSubscriber> REQUESTED =
8788
AtomicLongFieldUpdater.newUpdater(AssertSubscriber.class, "requested");
8889

90+
@SuppressWarnings("rawtypes")
91+
private static final AtomicIntegerFieldUpdater<AssertSubscriber> WIP =
92+
AtomicIntegerFieldUpdater.newUpdater(AssertSubscriber.class, "wip");
93+
8994
@SuppressWarnings("rawtypes")
9095
private static final AtomicReferenceFieldUpdater<AssertSubscriber, List> NEXT_VALUES =
9196
AtomicReferenceFieldUpdater.newUpdater(AssertSubscriber.class, List.class, "values");
@@ -104,6 +109,8 @@ public class AssertSubscriber<T> implements CoreSubscriber<T>, Subscription {
104109

105110
volatile long requested;
106111

112+
volatile int wip;
113+
107114
volatile List<T> values = new LinkedList<>();
108115

109116
/** The fusion mode to request. */
@@ -377,7 +384,7 @@ public final AssertSubscriber<T> assertError(Class<? extends Throwable> clazz) {
377384
}
378385
}
379386
if (s > 1) {
380-
throw new AssertionError("Multiple errors: " + s, null);
387+
throw new AssertionError("Multiple errors: " + errors, null);
381388
}
382389
return this;
383390
}
@@ -854,6 +861,10 @@ public void cancel() {
854861
a = S.getAndSet(this, Operators.cancelledSubscription());
855862
if (a != null && a != Operators.cancelledSubscription()) {
856863
a.cancel();
864+
865+
if (establishedFusionMode == Fuseable.ASYNC && WIP.getAndIncrement(this) == 0) {
866+
qs.clear();
867+
}
857868
}
858869
}
859870
}
@@ -881,10 +892,26 @@ public void onError(Throwable t) {
881892
@Override
882893
public void onNext(T t) {
883894
if (establishedFusionMode == Fuseable.ASYNC) {
895+
if (this.wip != 0 || WIP.getAndIncrement(this) != 0) {
896+
if (isCancelled()) {
897+
qs.clear();
898+
}
899+
return;
900+
}
901+
902+
int m = 0;
884903
for (; ; ) {
904+
if (isCancelled()) {
905+
qs.clear();
906+
break;
907+
}
885908
t = qs.poll();
886909
if (t == null) {
887-
break;
910+
m = WIP.addAndGet(this, -m);
911+
if (m == 0) {
912+
break;
913+
}
914+
continue;
888915
}
889916
valueCount++;
890917
if (valuesStorage) {

0 commit comments

Comments
 (0)