Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.rsocket.internal.jctools.queues.MpscUnboundedArrayQueue;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Subscriber;
Expand Down Expand Up @@ -55,6 +56,8 @@ public final class UnboundedProcessor<T> extends FluxProcessor<T, T>

volatile boolean cancelled;

volatile boolean terminated;

volatile int once;

@SuppressWarnings("rawtypes")
Expand Down Expand Up @@ -124,6 +127,9 @@ void drainRegular(Subscriber<? super T> a) {
}

if (checkTerminated(d, empty, a)) {
if (!empty) {
release(t);
}
return;
}

Expand Down Expand Up @@ -159,7 +165,9 @@ void drainFused(Subscriber<? super T> a) {
for (; ; ) {

if (cancelled) {
this.clear();
if (terminated) {
this.clear();
}
hasDownstream = false;
return;
}
Expand Down Expand Up @@ -189,7 +197,7 @@ void drainFused(Subscriber<? super T> a) {

public void drain() {
if (WIP.getAndIncrement(this) != 0) {
if (cancelled) {
if ((!outputFused && cancelled) || terminated) {
this.clear();
}
return;
Expand Down Expand Up @@ -350,7 +358,9 @@ public void cancel() {
cancelled = true;

if (WIP.getAndIncrement(this) == 0) {
this.clear();
if (!outputFused || terminated) {
this.clear();
}
hasDownstream = false;
}
}
Expand All @@ -377,24 +387,20 @@ public boolean isEmpty() {

@Override
public void clear() {
terminated = true;
if (DISCARD_GUARD.getAndIncrement(this) != 0) {
return;
}

int missed = 1;

for (; ; ) {
while (!queue.isEmpty()) {
T t = queue.poll();
if (t != null) {
release(t);
}
T t;
while ((t = queue.poll()) != null) {
release(t);
}
while (!priorityQueue.isEmpty()) {
T t = priorityQueue.poll();
if (t != null) {
release(t);
}
while ((t = priorityQueue.poll()) != null) {
release(t);
}

missed = DISCARD_GUARD.addAndGet(this, -missed);
Expand All @@ -415,7 +421,43 @@ public int requestFusion(int requestedMode) {

@Override
public void dispose() {
cancel();
if (cancelled) {
return;
}

error = new CancellationException("Disposed");
done = true;

boolean once = true;
if (WIP.getAndIncrement(this) == 0) {
cancelled = true;
int m = 1;
for (; ; ) {
final CoreSubscriber<? super T> a = this.actual;

if (!outputFused || terminated) {
clear();
}

if (a != null && once) {
try {
a.onError(error);
} catch (Throwable ignored) {
}
}

cancelled = true;
once = false;

int wip = this.wip;
if (wip == m) {
break;
}
m = wip;
}

hasDownstream = false;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-present the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,115 +16,173 @@

package io.rsocket.internal;

import io.rsocket.Payload;
import io.rsocket.util.ByteBufPayload;
import io.rsocket.util.EmptyPayload;
import java.util.concurrent.CountDownLatch;
import org.junit.Assert;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import io.rsocket.buffer.LeaksTrackingByteBufAllocator;
import io.rsocket.internal.subscriber.AssertSubscriber;
import java.time.Duration;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import reactor.core.Fuseable;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Operators;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.test.util.RaceTestUtils;

public class UnboundedProcessorTest {
@Test
public void testOnNextBeforeSubscribe_10() {
testOnNextBeforeSubscribeN(10);
}

@Test
public void testOnNextBeforeSubscribe_100() {
testOnNextBeforeSubscribeN(100);
}

@Test
public void testOnNextBeforeSubscribe_10_000() {
testOnNextBeforeSubscribeN(10_000);
@BeforeAll
public static void setup() {
Hooks.onErrorDropped(__ -> {});
}

@Test
public void testOnNextBeforeSubscribe_100_000() {
testOnNextBeforeSubscribeN(100_000);
}

@Test
public void testOnNextBeforeSubscribe_1_000_000() {
testOnNextBeforeSubscribeN(1_000_000);
}

@Test
public void testOnNextBeforeSubscribe_10_000_000() {
testOnNextBeforeSubscribeN(10_000_000);
public static void teardown() {
Hooks.resetOnErrorDropped();
}

@ParameterizedTest(
name =
"Test that emitting {0} onNext before subscribe and requestN should deliver all the signals once the subscriber is available")
@ValueSource(ints = {10, 100, 10_000, 100_000, 1_000_000, 10_000_000})
public void testOnNextBeforeSubscribeN(int n) {
UnboundedProcessor<Payload> processor = new UnboundedProcessor<>();
UnboundedProcessor<ByteBuf> processor = new UnboundedProcessor<>();

for (int i = 0; i < n; i++) {
processor.onNext(EmptyPayload.INSTANCE);
processor.onNext(Unpooled.EMPTY_BUFFER);
}

processor.onComplete();

long count = processor.count().block();

Assert.assertEquals(n, count);
}

@Test
public void testOnNextAfterSubscribe_10() throws Exception {
testOnNextAfterSubscribeN(10);
}

@Test
public void testOnNextAfterSubscribe_100() throws Exception {
testOnNextAfterSubscribeN(100);
StepVerifier.create(processor.count()).expectNext(Long.valueOf(n)).verifyComplete();
}

@Test
public void testOnNextAfterSubscribe_1000() throws Exception {
testOnNextAfterSubscribeN(1000);
}
@ParameterizedTest(
name =
"Test that emitting {0} onNext after subscribe and requestN should deliver all the signals")
@ValueSource(ints = {10, 100, 10_000})
public void testOnNextAfterSubscribeN(int n) {
UnboundedProcessor<ByteBuf> processor = new UnboundedProcessor<>();
AssertSubscriber<ByteBuf> assertSubscriber = AssertSubscriber.create();

@Test
public void testPrioritizedSending() {
UnboundedProcessor<Payload> processor = new UnboundedProcessor<>();
processor.subscribe(assertSubscriber);

for (int i = 0; i < 1000; i++) {
processor.onNext(EmptyPayload.INSTANCE);
for (int i = 0; i < n; i++) {
processor.onNext(Unpooled.EMPTY_BUFFER);
}

processor.onNextPrioritized(ByteBufPayload.create("test"));

Payload closestPayload = processor.next().block();

Assert.assertEquals(closestPayload.getDataUtf8(), "test");
assertSubscriber.awaitAndAssertNextValueCount(n);
}

@Test
public void testPrioritizedFused() {
UnboundedProcessor<Payload> processor = new UnboundedProcessor<>();
@ParameterizedTest(
name =
"Test that prioritized value sending deliver prioritized signals before the others mode[fusionEnabled={0}]")
@ValueSource(booleans = {true, false})
public void testPrioritizedSending(boolean fusedCase) {
UnboundedProcessor<ByteBuf> processor = new UnboundedProcessor<>();

for (int i = 0; i < 1000; i++) {
processor.onNext(EmptyPayload.INSTANCE);
processor.onNext(Unpooled.EMPTY_BUFFER);
}

processor.onNextPrioritized(ByteBufPayload.create("test"));
processor.onNextPrioritized(Unpooled.copiedBuffer("test", CharsetUtil.UTF_8));

Payload closestPayload = processor.poll();

Assert.assertEquals(closestPayload.getDataUtf8(), "test");
assertThat(fusedCase ? processor.poll() : processor.next().block())
.isNotNull()
.extracting(bb -> bb.toString(CharsetUtil.UTF_8))
.isEqualTo("test");
}

public void testOnNextAfterSubscribeN(int n) throws Exception {
CountDownLatch latch = new CountDownLatch(n);
UnboundedProcessor<Payload> processor = new UnboundedProcessor<>();
processor.log().doOnNext(integer -> latch.countDown()).subscribe();

for (int i = 0; i < n; i++) {
System.out.println("onNexting -> " + i);
processor.onNext(EmptyPayload.INSTANCE);
@ParameterizedTest(
name =
"Ensures that racing between onNext | dispose | cancel | request(n) will not cause any issues and leaks; mode[fusionEnabled={0}]")
@ValueSource(booleans = {true, false})
public void ensureUnboundedProcessorDisposesQueueProperly(boolean withFusionEnabled) {
final LeaksTrackingByteBufAllocator allocator =
LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
for (int i = 0; i < 100000; i++) {
final UnboundedProcessor<ByteBuf> unboundedProcessor = new UnboundedProcessor<>();

final ByteBuf buffer1 = allocator.buffer(1);
final ByteBuf buffer2 = allocator.buffer(2);

final AssertSubscriber<ByteBuf> assertSubscriber =
new AssertSubscriber<ByteBuf>(0)
.requestedFusionMode(withFusionEnabled ? Fuseable.ANY : Fuseable.NONE);

unboundedProcessor.subscribe(assertSubscriber);

RaceTestUtils.race(
() ->
RaceTestUtils.race(
() ->
RaceTestUtils.race(
() -> {
unboundedProcessor.onNext(buffer1);
unboundedProcessor.onNext(buffer2);
},
unboundedProcessor::dispose,
Schedulers.elastic()),
assertSubscriber::cancel,
Schedulers.elastic()),
() -> {
assertSubscriber.request(1);
assertSubscriber.request(1);
},
Schedulers.elastic());

assertSubscriber.values().forEach(ReferenceCountUtil::safeRelease);

allocator.assertHasNoLeaks();
}
}

processor.drain();

latch.await();
@RepeatedTest(
name =
"Ensures that racing between onNext + dispose | downstream async drain should not cause any issues and leaks",
value = 100000)
@Timeout(60)
public void ensuresAsyncFusionAndDisposureHasNoDeadlock() {
// TODO: enable leaks tracking
// final LeaksTrackingByteBufAllocator allocator =
// LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
final UnboundedProcessor<ByteBuf> unboundedProcessor = new UnboundedProcessor<>();

// final ByteBuf buffer1 = allocator.buffer(1);
// final ByteBuf buffer2 = allocator.buffer(2);

final AssertSubscriber<ByteBuf> assertSubscriber =
new AssertSubscriber<>(Operators.enableOnDiscard(null, ReferenceCountUtil::safeRelease));

unboundedProcessor.publishOn(Schedulers.parallel()).subscribe(assertSubscriber);

RaceTestUtils.race(
() -> {
// unboundedProcessor.onNext(buffer1);
// unboundedProcessor.onNext(buffer2);
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
unboundedProcessor.dispose();
},
unboundedProcessor::dispose);

assertSubscriber
.await(Duration.ofSeconds(50))
.values()
.forEach(ReferenceCountUtil::safeRelease);

// allocator.assertHasNoLeaks();
}
}
Loading