diff --git a/.github/workflows/gradle-all.yml b/.github/workflows/gradle-all.yml
new file mode 100644
index 000000000..8540539bb
--- /dev/null
+++ b/.github/workflows/gradle-all.yml
@@ -0,0 +1,45 @@
+name: Branches Java CI
+
+on:
+ # Trigger the workflow on push
+ # but only for the non master/1.0.x branches
+ push:
+ branches-ignore:
+ - 1.0.x
+ - master
+
+jobs:
+ build:
+
+ runs-on: ${{ matrix.os }}
+
+ strategy:
+ matrix:
+ os: [ ubuntu-latest ]
+ jdk: [ 1.8, 11, 14 ]
+ fail-fast: false
+
+ steps:
+ - uses: actions/checkout@v2
+ - name: Set up JDK ${{ matrix.jdk }}
+ uses: actions/setup-java@v1
+ with:
+ java-version: ${{ matrix.jdk }}
+ - name: Cache Gradle packages
+ uses: actions/cache@v1
+ with:
+ path: ~/.gradle/caches
+ key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }}
+ restore-keys: ${{ runner.os }}-gradle
+ - name: Grant execute permission for gradlew
+ run: chmod +x gradlew
+ - name: Build with Gradle
+ run: ./gradlew clean build
+ - name: Publish Packages to Artifactory
+ if: ${{ matrix.jdk == '1.8' }}
+ run: ./gradlew -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PversionSuffix="-${githubRef#refs/heads/}-SNAPSHOT" -PbuildNumber="${buildNumber}" artifactoryPublish --stacktrace
+ env:
+ bintrayUser: ${{ secrets.bintrayUser }}
+ bintrayKey: ${{ secrets.bintrayKey }}
+ githubRef: ${{ github.ref }}
+ buildNumber: ${{ github.run_number }}
\ No newline at end of file
diff --git a/.github/workflows/gradle-main.yml b/.github/workflows/gradle-main.yml
new file mode 100644
index 000000000..d8ba3c3d5
--- /dev/null
+++ b/.github/workflows/gradle-main.yml
@@ -0,0 +1,53 @@
+name: Main Branches Java CI
+
+on:
+ # Trigger the workflow on push
+ # but only for the master/1.0.x branch
+ push:
+ branches:
+ - master
+ - 1.0.x
+
+jobs:
+ build:
+
+ runs-on: ${{ matrix.os }}
+
+ strategy:
+ matrix:
+ os: [ ubuntu-latest ]
+ jdk: [ 1.8, 11, 14 ]
+ fail-fast: false
+
+ steps:
+ - uses: actions/checkout@v2
+ - name: Set up JDK ${{ matrix.jdk }}
+ uses: actions/setup-java@v1
+ with:
+ java-version: ${{ matrix.jdk }}
+ - name: Cache Gradle packages
+ uses: actions/cache@v1
+ with:
+ path: ~/.gradle/caches
+ key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }}
+ restore-keys: ${{ runner.os }}-gradle
+ - name: Grant execute permission for gradlew
+ run: chmod +x gradlew
+ - name: Build with Gradle
+ run: ./gradlew clean build
+ - name: Publish Packages to Artifactory
+ if: ${{ matrix.jdk == '1.8' }}
+ run: ./gradlew -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PversionSuffix="-SNAPSHOT" -PbuildNumber="${buildNumber}" artifactoryPublish --stacktrace
+ env:
+ bintrayUser: ${{ secrets.bintrayUser }}
+ bintrayKey: ${{ secrets.bintrayKey }}
+ buildNumber: ${{ github.run_number }}
+ - name: Aggregate test reports with ciMate
+ if: always()
+ continue-on-error: true
+ env:
+ CIMATE_PROJECT_ID: m84qx17y
+ run: |
+ wget -q https://get.cimate.io/release/linux/cimate
+ chmod +x cimate
+ ./cimate "**/TEST-*.xml"
\ No newline at end of file
diff --git a/.github/workflows/gradle-pr.yml b/.github/workflows/gradle-pr.yml
new file mode 100644
index 000000000..994450faf
--- /dev/null
+++ b/.github/workflows/gradle-pr.yml
@@ -0,0 +1,31 @@
+name: Pull Request Java CI
+
+on: [pull_request]
+
+jobs:
+ build:
+
+ runs-on: ${{ matrix.os }}
+
+ strategy:
+ matrix:
+ os: [ ubuntu-latest ]
+ jdk: [ 1.8, 11, 14 ]
+ fail-fast: false
+
+ steps:
+ - uses: actions/checkout@v2
+ - name: Set up JDK ${{ matrix.jdk }}
+ uses: actions/setup-java@v1
+ with:
+ java-version: ${{ matrix.jdk }}
+ - name: Cache Gradle packages
+ uses: actions/cache@v1
+ with:
+ path: ~/.gradle/caches
+ key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }}
+ restore-keys: ${{ runner.os }}-gradle
+ - name: Grant execute permission for gradlew
+ run: chmod +x gradlew
+ - name: Build with Gradle
+ run: ./gradlew clean build
\ No newline at end of file
diff --git a/.github/workflows/gradle-release.yml b/.github/workflows/gradle-release.yml
new file mode 100644
index 000000000..08f2698dc
--- /dev/null
+++ b/.github/workflows/gradle-release.yml
@@ -0,0 +1,44 @@
+name: Release Java CI
+
+on:
+ # Trigger the workflow on push
+ push:
+ # Sequence of patterns matched against refs/tags
+ tags:
+ - '*' # Push events to matching *, i.e. 1.0, 20.15.10
+
+jobs:
+ publish:
+
+ runs-on: ${{ matrix.os }}
+
+ strategy:
+ matrix:
+ os: [ ubuntu-latest ]
+ fail-fast: false
+
+ steps:
+ - uses: actions/checkout@v2
+ - name: Set up JDK 1.8
+ uses: actions/setup-java@v1
+ with:
+ java-version: 1.8
+ - name: Cache Gradle packages
+ uses: actions/cache@v1
+ with:
+ path: ~/.gradle/caches
+ key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }}
+ restore-keys: ${{ runner.os }}-gradle
+ - name: Grant execute permission for gradlew
+ run: chmod +x gradlew
+ - name: Build with Gradle
+ run: ./gradlew clean build
+ - name: Publish Packages to Bintray
+ run: ./gradlew -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" -Pversion="${githubRef#refs/tags/}" -PbuildNumber="${buildNumber}" bintrayUpload
+ env:
+ bintrayUser: ${{ secrets.bintrayUser }}
+ bintrayKey: ${{ secrets.bintrayKey }}
+ sonatypeUsername: ${{ secrets.sonatypeUsername }}
+ sonatypePassword: ${{ secrets.sonatypePassword }}
+ githubRef: ${{ github.ref }}
+ buildNumber: ${{ github.run_number }}
\ No newline at end of file
diff --git a/.travis.yml b/.travis.yml
deleted file mode 100644
index 4722957c8..000000000
--- a/.travis.yml
+++ /dev/null
@@ -1,45 +0,0 @@
-#
-# Copyright 2015-2018 the original author or authors.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
----
-language: java
-
-dist: trusty
-
-matrix:
- include:
- - jdk: openjdk8
- - jdk: openjdk11
- env: SKIP_RELEASE=true
- - jdk: openjdk14
- env: SKIP_RELEASE=true
-
-env:
- global:
- - secure: "WBCy0hsF96Xybj4n0AUrGY2m5FWCUa30XR+aVElSOO8d7v7BMypAT8mAd+yC2Y+j8WUGpIv59CqgeK1JrYdR9b3qRKhJmoE1Q92TotrxXMTIC9OKuU51LaaOqGYqx4SqiA2AyaikTFPd8um7KZfUpW/dG4IXySsiJ2OKT1jMUq6TmbWHnAYtjbl3u3WdjBQTIZNMtqG1+H1vIpsWyZrvbB4TWlNzhKBAu/YnlzMtvStrDaF7XrCJ2BQdMomQO18NH2gWxUEvLbQb6ip3wFl9CRe6vID7K1dmFwm08RPt9hRPC9yDahlIy8VvuNcWrP42TV+BVYy8V/hfaIo1pPsDBrtmVyc7YZjXSUM68orDFOkRB35qGkNIaAhy5Yt6G9QfwLXJkDFofW5KMKtDFUzf+j4DwS0CiDMF4k6Qq7YN1tYFXE9R8xa6Gv+wTNHqs4RURbYMS9IlbkhKxNbtyuema2sIUbsIfDezIzLI5BnfH2uli7O6/z0/G0Vfmf6A4q5Olm+7uhzMTI0GKheUIKr16SOxABlrwJtLJftzoKz9hYd3b7C9t61vYzccC3rWYobplwIcK2w50gFHQS8HLeiCjo8yjCx+IRSvAGaZIBPQdHCktrEYCVDUTXOxdaD6k6Ef+ppm8Nn+M+iC8x/G1wYE4x1lDqHw3GfhKsEQmiHL/98="
- - secure: "mbB+rv9eWUFQ9/yr2REH2ztH6r/Uq7cq/OJ5WK6yFp0TmPzlJ8jbEVwe/sdAMW2E4qrfMu1c2h3qsVm41pNx0MwEsIW/lTIZRiRmNYon32n+SHlRWyTn8dJeY/p1HoHs450OjLgB4X4jmRmfSt8IQ/w9ZCjF6HVcgR4ctt+myECTNcRidEIOahljnSJmnFFDsKbt2UJN96AfvvhbxcarEKgKLXLd9tQT2GlvEOM+hVOY9hKD5FvIoRp9heyCEAsSBXe+MIWQlh4jx+B4zCajZJ+8KN6M8KIt40lV8z4Zbc11jgq/xULJwkQIuVZvkJ3huIfUrxwLPgYWeai/TR/m3+2jy1hFajt96pnhJzFEz0IBL0wFALwAY1n2R/6uugEUYnDsFcGQGTsO5OeeOixiRPH5HNgfOhInqJoFh/887f+gq7OLXjlRCTsw+S9KknZ3iBpHX/+khurfAUC9khiMvufEq6Wyu0TvxhmGERFrs7uugeJ1VA85SDVQ6Au9MV831PeBGqzHpYG7w2kJj1EiFjBRUhCthxyDfX2b04egozlKF8JEifZ9EVj7pNMQUvVG2c9Wj6M0fG84NusnlZlA16XxAmfLevc9b/BOSSrqc2r9Z1ZvxFnBPP9H94Uqt9ZninhW/T49jRF+lQzD45MTVogzVk77XtdpzUemf4t5mHc="
- - secure: "GcPu3U4o2Dp7QLCqaAo3mGMJTl9yd+w+elXqqt7WDjrjm5p8mrzvQfyiJA7mRJVDTGpgib8fLctL1X1+QOX4fNKElrDUFhE3bWAqwVwHGPK4D3HCb6THD5XVqE4qcPmdLWPkvJ9ZY5nSIfuRVASjZTcc4XSXISK2jUSGar0PNYlo62/OFGvNvMz/qINU9RU7iYdDlL19yd72TKDfuK0UOKhQEGypamEHam3SMNCw/p8Q5K1vQe+Oba3ILCvYHJvqWc2NLjRXJjXfIaOq/NpCK6Lx2U9etdpkb5lyW5Cx1lkzIcRUq8ZUCwbkHog9LJoZGrZFh5AzlZ6kRuejBqu7AISmZy4s9HVAb7AQmNxvXkK9EIt8lavcaHnLYUIfuxvBqK/ptcUN5P/KXCs1DsbpADjB7YbUu/EQ2OAWncV31Z+O4uMHV29eGTtaz9LoK28+mHRfFHqoazWyuUejor6iSSkrCeqsLEvU8o6rH4oenKz7hLlZsJqHGACYtYNYi2CXYlTu0bMX+Hb1EtTu6Awm9Gn04TqVdmNexgF5CdqW4A696i6jlkPpVCt4B4nq4VPs2RMTkjVl3B7uOkDm18u35dncuhgsnMfVmo9cWX5COeyefdh6kdnKsUf0+IPbV/hix/OCP72dpuhxgcyzN+DvaVLzX7YOx7TpJTzPSKNEQZc="
- - secure: "UFJEzDEv6H2Qscg9UgZFVJq5oFvq7nQkVoSuGfh5Y4ZhL9PCK5f3Ft9oYEZOQwXaxWD1qivtJjQV3DdBiqsHkrnPrJ0hi3iYVDJo26xLNtu3welFw5Veqmgu2NuwjaDn6cjRFCJRLzpszMUWO1DvfLJTs3LuJDuXEyAKDw9eQgfOakqO4xeloyXgM7xnoXz11rgqtJNU6snjVPHftXNPTHGsNDlTR7SAIbjYwLMbdIKM2qjzrXkg+a94QOz2stnTDz9V5iYNH+3XXCcYxD9nb1Ol1XGWvtDnNGEhtGmylLdjHXwGLHiW2HOXskLzSkm7ASie1WdyHVHZb4X8LjxCy62S0FPevBgat1a443Khx5HCMYR/8dQrlOI82GYTr8n9U6QQE4Li8XLw64DVP9HGs9jdbsfEdlIsiPWqB6ujlwiO6pyfmQGQCgjALA+oD87uDQLcgh+SDYgE0ZwmwGzbjeynZpoCrEE8A1GHhSwkM9khx6EJFacm9XzqoUGK0wB1f8su+51fqPglF1zye80IFA4wOMMAY+KUc9du/vQ98f0lfjsNSOC02CKYxbA5RaakQMAYjirsZraA57xLmCSIGMhhW4wClQdJBww6LLz463yZU4WPwyqU+ZW12aV5dVLb5RWXIbZKmdT74DfZajHvqgTYpb05L5cJl7ApMspUkKk="
-
-script: ci/travis.sh
-
-before_cache:
-- rm -f $HOME/.gradle/caches/modules-2/modules-2.lock
-- rm -fr $HOME/.gradle/caches/*/plugin-resolution/
-
-cache:
- directories:
- - $HOME/.gradle/caches/
- - $HOME/.gradle/wrapper/
diff --git a/build.gradle b/build.gradle
index 5230ce018..64e7401df 100644
--- a/build.gradle
+++ b/build.gradle
@@ -128,6 +128,7 @@ subprojects {
links 'https://projectreactor.io/docs/core/release/api/'
links 'https://netty.io/4.1/api/'
}
+ failOnError = false
}
tasks.named("javadoc").configure {
diff --git a/ci/travis.sh b/ci/travis.sh
deleted file mode 100755
index df3fc1245..000000000
--- a/ci/travis.sh
+++ /dev/null
@@ -1,44 +0,0 @@
-#!/usr/bin/env bash
-
-if [ "$TRAVIS_PULL_REQUEST" != "false" ]; then
-
- echo -e "Building PR #$TRAVIS_PULL_REQUEST [$TRAVIS_PULL_REQUEST_SLUG/$TRAVIS_PULL_REQUEST_BRANCH => $TRAVIS_REPO_SLUG/$TRAVIS_BRANCH]"
- ./gradlew build
-
-elif [ "$TRAVIS_PULL_REQUEST" == "false" ] && [ "$TRAVIS_TAG" == "" ] && [ "$bintrayUser" != "" ] && [ "$TRAVIS_BRANCH" == "1.0.x" ] ; then
-
- echo -e "Building Develop Snapshot $TRAVIS_REPO_SLUG/$TRAVIS_BRANCH/$TRAVIS_BUILD_NUMBER"
- ./gradlew \
- -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" \
- -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" \
- -PversionSuffix="-SNAPSHOT" \
- -PbuildNumber="$TRAVIS_BUILD_NUMBER" \
- build artifactoryPublish --stacktrace
-
-elif [ "$TRAVIS_PULL_REQUEST" == "false" ] && [ "$TRAVIS_TAG" == "" ] && [ "$bintrayUser" != "" ] ; then
-
- echo -e "Building Branch Snapshot $TRAVIS_REPO_SLUG/$TRAVIS_BRANCH/$TRAVIS_BUILD_NUMBER"
- ./gradlew \
- -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" \
- -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" \
- -PversionSuffix="-${TRAVIS_BRANCH//\//-}-SNAPSHOT" \
- -PbuildNumber="$TRAVIS_BUILD_NUMBER" \
- build artifactoryPublish --stacktrace
-
-elif [ "$TRAVIS_PULL_REQUEST" == "false" ] && [ "$TRAVIS_TAG" != "" ] && [ "$bintrayUser" != "" ] ; then
-
- echo -e "Building Tag $TRAVIS_REPO_SLUG/$TRAVIS_TAG"
- ./gradlew \
- -Pversion="$TRAVIS_TAG" \
- -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" \
- -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" \
- -PbuildNumber="$TRAVIS_BUILD_NUMBER" \
- build bintrayUpload --stacktrace
-
-else
-
- echo -e "Building $TRAVIS_REPO_SLUG/$TRAVIS_BRANCH"
- ./gradlew build
-
-fi
-
diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java
index b6eaec7c9..eab70cc30 100644
--- a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java
+++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java
@@ -213,7 +213,7 @@ public RSocketConnector metadataMimeType(String metadataMimeType) {
*
For mobile-to-server connections, the time interval between client {@code KEEPALIVE}
- * frames is often > 30,000ms.
+ * frames is often {@code >} 30,000ms.
*
*
* By default these are set to 20 seconds and 90 seconds respectively.
diff --git a/rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java b/rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java
index c431b3f3f..979743fb1 100644
--- a/rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java
+++ b/rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java
@@ -153,19 +153,19 @@ public T block(@Nullable Duration timeout) {
delay = System.nanoTime() + timeout.toNanos();
}
for (; ; ) {
- BiConsumer[] inners = this.subscribers;
+ subscribers = this.subscribers;
- if (inners == READY) {
+ if (subscribers == READY) {
final T value = this.value;
if (value != null) {
return value;
} else {
// value == null means racing between invalidate and this block
// thus, we have to update the state again and see what happened
- inners = this.subscribers;
+ subscribers = this.subscribers;
}
}
- if (inners == TERMINATED) {
+ if (subscribers == TERMINATED) {
RuntimeException re = Exceptions.propagate(this.t);
re = Exceptions.addSuppressed(re, new Exception("Terminated with an error"));
throw re;
@@ -174,6 +174,12 @@ public T block(@Nullable Duration timeout) {
throw new IllegalStateException("Timeout on Mono blocking read");
}
+ // connect again since invalidate() has happened in between
+ if (subscribers == EMPTY_UNSUBSCRIBED
+ && SUBSCRIBERS.compareAndSet(this, EMPTY_UNSUBSCRIBED, EMPTY_SUBSCRIBED)) {
+ this.doSubscribe();
+ }
+
Thread.sleep(1);
}
} catch (InterruptedException ie) {
@@ -186,6 +192,7 @@ public T block(@Nullable Duration timeout) {
@SuppressWarnings("unchecked")
final void terminate(Throwable t) {
if (isDisposed()) {
+ Operators.onErrorDropped(t, Context.empty());
return;
}
diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java
index d2a438dfd..94d5e9a7a 100644
--- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java
+++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java
@@ -115,13 +115,9 @@ void drainRegular(Subscriber super T> a) {
while (r != e) {
boolean d = done;
- T t;
- boolean empty;
-
- if (!pq.isEmpty()) {
- t = pq.poll();
- empty = false;
- } else {
+ T t = pq.poll();
+ boolean empty = t == null;
+ if (empty) {
t = q.poll();
empty = t == null;
}
@@ -196,8 +192,9 @@ void drainFused(Subscriber super T> a) {
}
public void drain() {
- if (WIP.getAndIncrement(this) != 0) {
- if ((!outputFused && cancelled) || terminated) {
+ final int previousWip = WIP.getAndIncrement(this);
+ if (previousWip != 0) {
+ if (previousWip < 0 || terminated) {
this.clear();
}
return;
@@ -231,6 +228,7 @@ boolean checkTerminated(boolean d, boolean empty, Subscriber super T> a) {
return true;
}
if (d && empty) {
+ this.clear();
Throwable e = error;
hasDownstream = false;
if (e != null) {
@@ -330,11 +328,7 @@ public void subscribe(CoreSubscriber super T> actual) {
actual.onSubscribe(this);
this.actual = actual;
- if (cancelled) {
- this.hasDownstream = false;
- } else {
- drain();
- }
+ drain();
} else {
Operators.error(
actual,
@@ -388,6 +382,18 @@ public boolean isEmpty() {
@Override
public void clear() {
terminated = true;
+ for (; ; ) {
+ int wip = this.wip;
+
+ clearSafely();
+
+ if (WIP.compareAndSet(this, wip, Integer.MIN_VALUE)) {
+ return;
+ }
+ }
+ }
+
+ void clearSafely() {
if (DISCARD_GUARD.getAndIncrement(this) != 0) {
return;
}
@@ -428,34 +434,20 @@ public void dispose() {
error = new CancellationException("Disposed");
done = true;
- boolean once = true;
if (WIP.getAndIncrement(this) == 0) {
cancelled = true;
- int m = 1;
- for (; ; ) {
- final CoreSubscriber super T> a = this.actual;
-
- if (!outputFused || terminated) {
- clear();
- }
-
- if (a != null && once) {
- try {
- a.onError(error);
- } catch (Throwable ignored) {
- }
- }
+ final CoreSubscriber super T> a = this.actual;
- cancelled = true;
- once = false;
+ if (!outputFused || terminated) {
+ clear();
+ }
- int wip = this.wip;
- if (wip == m) {
- break;
+ if (a != null) {
+ try {
+ a.onError(error);
+ } catch (Throwable ignored) {
}
- m = wip;
}
-
hasDownstream = false;
}
}
diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java
index 1ce68cfeb..b5a3dcb83 100644
--- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java
+++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java
@@ -91,7 +91,6 @@
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.UnicastProcessor;
-import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;
import reactor.test.util.RaceTestUtils;
@@ -1082,15 +1081,11 @@ public void shouldTerminateAllStreamsIfThereRacingBetweenDisposeAndRequests(
Publisher> publisher2 = interaction2.apply(rule, payload2);
RaceTestUtils.race(
() -> rule.socket.dispose(),
- () ->
- RaceTestUtils.race(
- () -> publisher1.subscribe(assertSubscriber1),
- () -> publisher2.subscribe(assertSubscriber2),
- Schedulers.parallel()),
- Schedulers.parallel());
+ () -> publisher1.subscribe(assertSubscriber1),
+ () -> publisher2.subscribe(assertSubscriber2));
assertSubscriber1.await().assertTerminated();
- if (interactionType1 != REQUEST_FNF) {
+ if (interactionType1 != REQUEST_FNF && interactionType1 != METADATA_PUSH) {
assertSubscriber1.assertError(ClosedChannelException.class);
} else {
try {
@@ -1101,7 +1096,7 @@ public void shouldTerminateAllStreamsIfThereRacingBetweenDisposeAndRequests(
}
}
assertSubscriber2.await().assertTerminated();
- if (interactionType2 != REQUEST_FNF) {
+ if (interactionType2 != REQUEST_FNF && interactionType2 != METADATA_PUSH) {
assertSubscriber2.assertError(ClosedChannelException.class);
} else {
try {
diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java
index 0d0fbd8c0..76691adce 100644
--- a/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java
+++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java
@@ -84,8 +84,6 @@
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
-import reactor.core.scheduler.Scheduler;
-import reactor.core.scheduler.Schedulers;
import reactor.test.publisher.TestPublisher;
import reactor.test.util.RaceTestUtils;
@@ -340,7 +338,6 @@ public Flux requestChannel(Publisher payloads) {
@Test
public void checkNoLeaksOnRacingBetweenDownstreamCancelAndOnNextFromRequestChannelTest1() {
- Scheduler parallel = Schedulers.parallel();
Hooks.onErrorDropped((e) -> {});
ByteBufAllocator allocator = rule.alloc();
for (int i = 0; i < 10000; i++) {
@@ -366,17 +363,13 @@ public Flux requestChannel(Publisher payloads) {
ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, 1, Integer.MAX_VALUE);
FluxSink sink = sinks[0];
RaceTestUtils.race(
- () ->
- RaceTestUtils.race(
- () -> rule.connection.addToReceivedBuffer(requestNFrame),
- () -> rule.connection.addToReceivedBuffer(cancelFrame),
- parallel),
+ () -> rule.connection.addToReceivedBuffer(requestNFrame),
+ () -> rule.connection.addToReceivedBuffer(cancelFrame),
() -> {
sink.next(ByteBufPayload.create("d1", "m1"));
sink.next(ByteBufPayload.create("d2", "m2"));
sink.next(ByteBufPayload.create("d3", "m3"));
- },
- parallel);
+ });
Assertions.assertThat(rule.connection.getSent()).allMatch(ReferenceCounted::release);
@@ -387,7 +380,6 @@ public Flux requestChannel(Publisher payloads) {
@Test
public void
checkNoLeaksOnRacingBetweenDownstreamCancelAndOnNextFromUpstreamOnErrorFromRequestChannelTest1() {
- Scheduler parallel = Schedulers.parallel();
Hooks.onErrorDropped((e) -> {});
ByteBufAllocator allocator = rule.alloc();
for (int i = 0; i < 10000; i++) {
@@ -453,18 +445,14 @@ public Flux requestChannel(Publisher payloads) {
FluxSink sink = sinks[0];
RaceTestUtils.race(
- () ->
- RaceTestUtils.race(
- () -> rule.connection.addToReceivedBuffer(requestNFrame),
- () -> rule.connection.addToReceivedBuffer(nextFrame1, nextFrame2, nextFrame3),
- parallel),
+ () -> rule.connection.addToReceivedBuffer(requestNFrame),
+ () -> rule.connection.addToReceivedBuffer(nextFrame1, nextFrame2, nextFrame3),
() -> {
sink.next(np1);
sink.next(np2);
sink.next(np3);
sink.error(new RuntimeException());
- },
- parallel);
+ });
Assertions.assertThat(rule.connection.getSent()).allMatch(ReferenceCounted::release);
@@ -484,7 +472,6 @@ public Flux requestChannel(Publisher payloads) {
@Test
public void checkNoLeaksOnRacingBetweenDownstreamCancelAndOnNextFromRequestStreamTest1() {
- Scheduler parallel = Schedulers.parallel();
Hooks.onErrorDropped((e) -> {});
ByteBufAllocator allocator = rule.alloc();
for (int i = 0; i < 10000; i++) {
@@ -510,8 +497,7 @@ public Flux requestStream(Payload payload) {
sink.next(ByteBufPayload.create("d1", "m1"));
sink.next(ByteBufPayload.create("d2", "m2"));
sink.next(ByteBufPayload.create("d3", "m3"));
- },
- parallel);
+ });
Assertions.assertThat(rule.connection.getSent()).allMatch(ReferenceCounted::release);
@@ -521,7 +507,6 @@ public Flux requestStream(Payload payload) {
@Test
public void checkNoLeaksOnRacingBetweenDownstreamCancelAndOnNextFromRequestResponseTest1() {
- Scheduler parallel = Schedulers.parallel();
Hooks.onErrorDropped((e) -> {});
ByteBufAllocator allocator = rule.alloc();
for (int i = 0; i < 10000; i++) {
@@ -550,8 +535,7 @@ public void subscribe(CoreSubscriber super Payload> actual) {
() -> rule.connection.addToReceivedBuffer(cancelFrame),
() -> {
sources[0].complete(ByteBufPayload.create("d1", "m1"));
- },
- parallel);
+ });
Assertions.assertThat(rule.connection.getSent()).allMatch(ReferenceCounted::release);
diff --git a/rsocket-core/src/test/java/io/rsocket/core/ReconnectMonoTests.java b/rsocket-core/src/test/java/io/rsocket/core/ReconnectMonoTests.java
index 8d96222df..ad3013f8e 100644
--- a/rsocket-core/src/test/java/io/rsocket/core/ReconnectMonoTests.java
+++ b/rsocket-core/src/test/java/io/rsocket/core/ReconnectMonoTests.java
@@ -18,6 +18,7 @@
import static org.junit.Assert.assertEquals;
+import io.rsocket.internal.subscriber.AssertSubscriber;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
@@ -29,6 +30,7 @@
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.assertj.core.api.Assertions;
import org.junit.Test;
@@ -174,8 +176,7 @@ public void shouldNotExpireNewlyResolvedValueIfSubscribeIsRacingWithInvalidate()
reconnectMono.resolvingInner.mainSubscriber.onNext("value_to_not_expire" + index);
reconnectMono.resolvingInner.mainSubscriber.onComplete();
}
- },
- Schedulers.parallel());
+ });
Assertions.assertThat(processor.isTerminated()).isTrue();
@@ -231,9 +232,8 @@ public void shouldNotExpireNewlyResolvedValueIfSubscribeIsRacingWithInvalidates(
reconnectMono.resolvingInner.mainSubscriber.onComplete();
RaceTestUtils.race(
- () ->
- RaceTestUtils.race(
- reconnectMono::invalidate, reconnectMono::invalidate, Schedulers.parallel()),
+ reconnectMono::invalidate,
+ reconnectMono::invalidate,
() -> {
reconnectMono.subscribe(racerProcessor);
if (!racerProcessor.isTerminated()) {
@@ -241,8 +241,7 @@ public void shouldNotExpireNewlyResolvedValueIfSubscribeIsRacingWithInvalidates(
"value_to_possibly_expire" + index);
reconnectMono.resolvingInner.mainSubscriber.onComplete();
}
- },
- Schedulers.parallel());
+ });
Assertions.assertThat(processor.isTerminated()).isTrue();
@@ -284,46 +283,54 @@ public void shouldNotExpireNewlyResolvedValueIfBlockIsRacingWithInvalidate() {
Hooks.onErrorDropped(t -> {});
for (int i = 0; i < 10000; i++) {
final int index = i;
- final TestPublisher cold =
- TestPublisher.createNoncompliant(TestPublisher.Violation.REQUEST_OVERFLOW);
+ final Mono source =
+ Mono.fromSupplier(
+ new Supplier() {
+ boolean once = false;
- final ReconnectMono reconnectMono =
- cold.mono().as(source -> new ReconnectMono<>(source, onExpire(), onValue()));
+ @Override
+ public String get() {
- final MonoProcessor processor = reconnectMono.subscribeWith(MonoProcessor.create());
+ if (!once) {
+ once = true;
+ return "value_to_expire" + index;
+ }
+
+ return "value_to_not_expire" + index;
+ }
+ });
+
+ final ReconnectMono reconnectMono =
+ new ReconnectMono<>(
+ source.subscribeOn(Schedulers.boundedElastic()), onExpire(), onValue());
Assertions.assertThat(expired).isEmpty();
Assertions.assertThat(received).isEmpty();
- reconnectMono.resolvingInner.mainSubscriber.onNext("value_to_expire" + i);
- reconnectMono.resolvingInner.mainSubscriber.onComplete();
+ final AssertSubscriber subscriber =
+ reconnectMono.subscribeWith(new AssertSubscriber<>());
- RaceTestUtils.race(
- () ->
- Assertions.assertThat(reconnectMono.block())
- .matches(
- (v) ->
- v.equals("value_to_not_expire" + index)
- || v.equals("value_to_expire" + index)),
- () ->
- RaceTestUtils.race(
- reconnectMono::invalidate,
- () -> {
- for (; ; ) {
- if (reconnectMono.resolvingInner.subscribers != ResolvingOperator.READY) {
- reconnectMono.resolvingInner.mainSubscriber.onNext(
- "value_to_not_expire" + index);
- reconnectMono.resolvingInner.mainSubscriber.onComplete();
- break;
- }
- }
- },
- Schedulers.parallel()),
- Schedulers.parallel());
+ subscriber.await().assertComplete();
- Assertions.assertThat(processor.isTerminated()).isTrue();
+ Assertions.assertThat(expired).isEmpty();
- Assertions.assertThat(processor.peek()).isEqualTo("value_to_expire" + i);
+ try {
+
+ RaceTestUtils.race(
+ () ->
+ Assertions.assertThat(reconnectMono.block())
+ .matches(
+ (v) ->
+ v.equals("value_to_not_expire" + index)
+ || v.equals("value_to_expire" + index)),
+ reconnectMono::invalidate);
+ } catch (Throwable t) {
+ t.printStackTrace();
+ }
+
+ subscriber.assertTerminated();
+
+ subscriber.assertValues("value_to_expire" + i);
Assertions.assertThat(expired).hasSize(1).containsOnly("value_to_expire" + i);
if (reconnectMono.resolvingInner.subscribers == ResolvingOperator.READY) {
diff --git a/rsocket-core/src/test/java/io/rsocket/core/ResolvingOperatorTests.java b/rsocket-core/src/test/java/io/rsocket/core/ResolvingOperatorTests.java
index 29748abbe..608e1a336 100644
--- a/rsocket-core/src/test/java/io/rsocket/core/ResolvingOperatorTests.java
+++ b/rsocket-core/src/test/java/io/rsocket/core/ResolvingOperatorTests.java
@@ -38,7 +38,6 @@
import org.reactivestreams.Subscription;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.MonoProcessor;
-import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.test.util.RaceTestUtils;
import reactor.util.retry.Retry;
@@ -194,8 +193,7 @@ public void shouldNotExpireNewlyResolvedValueIfSubscribeIsRacingWithInvalidate()
if (!processor2.isTerminated()) {
self.complete(valueToSend2);
}
- },
- Schedulers.parallel()))
+ }))
.then(
self -> {
if (self.isPending()) {
@@ -270,16 +268,14 @@ public void shouldNotExpireNewlyResolvedValueIfSubscribeIsRacingWithInvalidates(
.then(
self ->
RaceTestUtils.race(
- () ->
- RaceTestUtils.race(
- self::invalidate, self::invalidate, Schedulers.parallel()),
+ self::invalidate,
+ self::invalidate,
() -> {
self.observe(consumer2);
if (!processor2.isTerminated()) {
self.complete(valueToSend2);
}
- },
- Schedulers.parallel()))
+ }))
.then(
self -> {
if (!self.isPending()) {
@@ -371,19 +367,15 @@ public void shouldNotExpireNewlyResolvedValueIfBlockIsRacingWithInvalidate() {
() ->
Assertions.assertThat(self.block(null))
.matches((v) -> v.equals(valueToSend) || v.equals(valueToSend2)),
- () ->
- RaceTestUtils.race(
- self::invalidate,
- () -> {
- for (; ; ) {
- if (self.subscribers != ResolvingOperator.READY) {
- self.complete(valueToSend2);
- break;
- }
- }
- },
- Schedulers.parallel()),
- Schedulers.parallel()))
+ self::invalidate,
+ () -> {
+ for (; ; ) {
+ if (self.subscribers != ResolvingOperator.READY) {
+ self.complete(valueToSend2);
+ break;
+ }
+ }
+ }))
.then(
self -> {
if (self.isPending()) {
diff --git a/rsocket-core/src/test/java/io/rsocket/exceptions/ExceptionsTest.java b/rsocket-core/src/test/java/io/rsocket/exceptions/ExceptionsTest.java
index b3f596a37..b09548245 100644
--- a/rsocket-core/src/test/java/io/rsocket/exceptions/ExceptionsTest.java
+++ b/rsocket-core/src/test/java/io/rsocket/exceptions/ExceptionsTest.java
@@ -217,6 +217,8 @@ void fromCustomRSocketException() {
assertThat(Exceptions.from(0, byteBuf))
.hasMessage("Invalid Error frame in Stream ID 0: 0x%08X '%s'", randomCode, "test-message")
.isInstanceOf(IllegalArgumentException.class);
+
+ byteBuf.release();
}
}
diff --git a/rsocket-core/src/test/java/io/rsocket/internal/UnboundedProcessorTest.java b/rsocket-core/src/test/java/io/rsocket/internal/UnboundedProcessorTest.java
index 271c08664..5177a65be 100644
--- a/rsocket-core/src/test/java/io/rsocket/internal/UnboundedProcessorTest.java
+++ b/rsocket-core/src/test/java/io/rsocket/internal/UnboundedProcessorTest.java
@@ -28,13 +28,11 @@
import java.time.Duration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.RepeatedTest;
-import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import reactor.core.Fuseable;
import reactor.core.publisher.Hooks;
-import reactor.core.publisher.Operators;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import reactor.test.util.RaceTestUtils;
@@ -110,7 +108,7 @@ public void testPrioritizedSending(boolean fusedCase) {
public void ensureUnboundedProcessorDisposesQueueProperly(boolean withFusionEnabled) {
final LeaksTrackingByteBufAllocator allocator =
LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
- for (int i = 0; i < 100000; i++) {
+ for (int i = 0; i < 10000; i++) {
final UnboundedProcessor unboundedProcessor = new UnboundedProcessor<>();
final ByteBuf buffer1 = allocator.buffer(1);
@@ -123,68 +121,247 @@ public void ensureUnboundedProcessorDisposesQueueProperly(boolean withFusionEnab
unboundedProcessor.subscribe(assertSubscriber);
RaceTestUtils.race(
- () ->
- RaceTestUtils.race(
- () ->
- RaceTestUtils.race(
- () -> {
- unboundedProcessor.onNext(buffer1);
- unboundedProcessor.onNext(buffer2);
- },
- unboundedProcessor::dispose,
- Schedulers.elastic()),
- assertSubscriber::cancel,
- Schedulers.elastic()),
+ () -> {
+ unboundedProcessor.onNext(buffer1);
+ unboundedProcessor.onNext(buffer2);
+ },
+ unboundedProcessor::dispose,
+ assertSubscriber::cancel,
() -> {
assertSubscriber.request(1);
assertSubscriber.request(1);
+ });
+
+ assertSubscriber.values().forEach(ReferenceCountUtil::release);
+
+ allocator.assertHasNoLeaks();
+ }
+ }
+
+ @ParameterizedTest(
+ name =
+ "Ensures that racing between onNext | dispose | cancel | request(n) | terminal will not cause any issues and leaks; mode[fusionEnabled={0}]")
+ @ValueSource(booleans = {true, false})
+ public void smokeTest1(boolean withFusionEnabled) {
+ final LeaksTrackingByteBufAllocator allocator =
+ LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
+ final RuntimeException runtimeException = new RuntimeException("test");
+ for (int i = 0; i < 10000; i++) {
+ final UnboundedProcessor unboundedProcessor = new UnboundedProcessor<>();
+
+ final ByteBuf buffer1 = allocator.buffer(1);
+ final ByteBuf buffer2 = allocator.buffer(2);
+ final ByteBuf buffer3 = allocator.buffer(3);
+ final ByteBuf buffer4 = allocator.buffer(4);
+
+ final AssertSubscriber assertSubscriber =
+ new AssertSubscriber(0)
+ .requestedFusionMode(withFusionEnabled ? Fuseable.ANY : Fuseable.NONE);
+
+ unboundedProcessor.subscribe(assertSubscriber);
+
+ RaceTestUtils.race(
+ () -> {
+ unboundedProcessor.onNext(buffer1);
+ unboundedProcessor.onNextPrioritized(buffer2);
+ },
+ () -> {
+ unboundedProcessor.onNextPrioritized(buffer3);
+ unboundedProcessor.onNext(buffer4);
+ },
+ unboundedProcessor::dispose,
+ unboundedProcessor::onComplete,
+ () -> unboundedProcessor.onError(runtimeException),
+ assertSubscriber::cancel,
+ () -> {
+ assertSubscriber.request(1);
+ assertSubscriber.request(1);
+ assertSubscriber.request(1);
+ assertSubscriber.request(1);
+ });
+
+ assertSubscriber.values().forEach(ReferenceCountUtil::release);
+
+ allocator.assertHasNoLeaks();
+ }
+ }
+
+ @ParameterizedTest(
+ name =
+ "Ensures that racing between onNext | dispose | subscribe | request(n) | terminal will not cause any issues and leaks; mode[fusionEnabled={0}]")
+ @ValueSource(booleans = {true, false})
+ @Disabled("hard to support in 1.0.x")
+ public void smokeTest2(boolean withFusionEnabled) {
+ final LeaksTrackingByteBufAllocator allocator =
+ LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
+ final RuntimeException runtimeException = new RuntimeException("test");
+ for (int i = 0; i < 10000; i++) {
+ final UnboundedProcessor unboundedProcessor = new UnboundedProcessor<>();
+
+ final ByteBuf buffer1 = allocator.buffer(1);
+ final ByteBuf buffer2 = allocator.buffer(2);
+ final ByteBuf buffer3 = allocator.buffer(3);
+ final ByteBuf buffer4 = allocator.buffer(4);
+
+ final AssertSubscriber assertSubscriber =
+ new AssertSubscriber(0)
+ .requestedFusionMode(withFusionEnabled ? Fuseable.ANY : Fuseable.NONE);
+
+ RaceTestUtils.race(
+ Schedulers.boundedElastic(),
+ () -> {
+ unboundedProcessor.onNext(buffer1);
+ unboundedProcessor.onNextPrioritized(buffer2);
+ },
+ () -> {
+ unboundedProcessor.onNextPrioritized(buffer3);
+ unboundedProcessor.onNext(buffer4);
},
- Schedulers.elastic());
+ unboundedProcessor::dispose,
+ unboundedProcessor::onComplete,
+ () -> unboundedProcessor.onError(runtimeException),
+ () -> {
+ unboundedProcessor.subscribe(assertSubscriber);
+ assertSubscriber.request(1);
+ assertSubscriber.request(1);
+ assertSubscriber.request(1);
+ assertSubscriber.request(1);
+ });
- assertSubscriber.values().forEach(ReferenceCountUtil::safeRelease);
+ assertSubscriber.values().forEach(ReferenceCountUtil::release);
allocator.assertHasNoLeaks();
}
}
- @RepeatedTest(
+ @ParameterizedTest(
name =
- "Ensures that racing between onNext + dispose | downstream async drain should not cause any issues and leaks",
- value = 100000)
- @Timeout(60)
- public void ensuresAsyncFusionAndDisposureHasNoDeadlock() {
- // TODO: enable leaks tracking
- // final LeaksTrackingByteBufAllocator allocator =
- // LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
- final UnboundedProcessor unboundedProcessor = new UnboundedProcessor<>();
-
- // final ByteBuf buffer1 = allocator.buffer(1);
- // final ByteBuf buffer2 = allocator.buffer(2);
-
- final AssertSubscriber assertSubscriber =
- new AssertSubscriber<>(Operators.enableOnDiscard(null, ReferenceCountUtil::safeRelease));
-
- unboundedProcessor.publishOn(Schedulers.parallel()).subscribe(assertSubscriber);
-
- RaceTestUtils.race(
- () -> {
- // unboundedProcessor.onNext(buffer1);
- // unboundedProcessor.onNext(buffer2);
- unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
- unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
- unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
- unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
- unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
- unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER);
- unboundedProcessor.dispose();
- },
- unboundedProcessor::dispose);
-
- assertSubscriber
- .await(Duration.ofSeconds(50))
- .values()
- .forEach(ReferenceCountUtil::safeRelease);
-
- // allocator.assertHasNoLeaks();
+ "Ensures that racing between onNext | dispose | subscribe(cancelled) | terminal will not cause any issues and leaks; mode[fusionEnabled={0}]")
+ @ValueSource(booleans = {true, false})
+ public void smokeTest3(boolean withFusionEnabled) {
+ final LeaksTrackingByteBufAllocator allocator =
+ LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
+ final RuntimeException runtimeException = new RuntimeException("test");
+ for (int i = 0; i < 10000; i++) {
+ final UnboundedProcessor unboundedProcessor = new UnboundedProcessor<>();
+
+ final ByteBuf buffer1 = allocator.buffer(1);
+ final ByteBuf buffer2 = allocator.buffer(2);
+ final ByteBuf buffer3 = allocator.buffer(3);
+ final ByteBuf buffer4 = allocator.buffer(4);
+
+ final AssertSubscriber assertSubscriber =
+ new AssertSubscriber(0)
+ .requestedFusionMode(withFusionEnabled ? Fuseable.ANY : Fuseable.NONE);
+
+ assertSubscriber.cancel();
+
+ RaceTestUtils.race(
+ Schedulers.boundedElastic(),
+ () -> {
+ unboundedProcessor.onNext(buffer1);
+ unboundedProcessor.onNextPrioritized(buffer2);
+ },
+ () -> {
+ unboundedProcessor.onNextPrioritized(buffer3);
+ unboundedProcessor.onNext(buffer4);
+ },
+ unboundedProcessor::dispose,
+ unboundedProcessor::onComplete,
+ () -> unboundedProcessor.onError(runtimeException),
+ () -> unboundedProcessor.subscribe(assertSubscriber));
+
+ assertSubscriber.values().forEach(ReferenceCountUtil::release);
+
+ allocator.assertHasNoLeaks();
+ }
+ }
+
+ @ParameterizedTest(
+ name =
+ "Ensures that racing between onNext | dispose | subscribe(cancelled) | terminal will not cause any issues and leaks; mode[fusionEnabled={0}]")
+ @ValueSource(booleans = {true, false})
+ public void smokeTest31(boolean withFusionEnabled) {
+ final LeaksTrackingByteBufAllocator allocator =
+ LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
+ final RuntimeException runtimeException = new RuntimeException("test");
+ for (int i = 0; i < 10000; i++) {
+ final UnboundedProcessor unboundedProcessor = new UnboundedProcessor<>();
+
+ final ByteBuf buffer1 = allocator.buffer(1);
+ final ByteBuf buffer2 = allocator.buffer(2);
+ final ByteBuf buffer3 = allocator.buffer(3);
+ final ByteBuf buffer4 = allocator.buffer(4);
+
+ final AssertSubscriber assertSubscriber =
+ new AssertSubscriber(0)
+ .requestedFusionMode(withFusionEnabled ? Fuseable.ANY : Fuseable.NONE);
+
+ RaceTestUtils.race(
+ Schedulers.boundedElastic(),
+ () -> {
+ unboundedProcessor.onNext(buffer1);
+ unboundedProcessor.onNextPrioritized(buffer2);
+ },
+ () -> {
+ unboundedProcessor.onNextPrioritized(buffer3);
+ unboundedProcessor.onNext(buffer4);
+ },
+ unboundedProcessor::dispose,
+ unboundedProcessor::onComplete,
+ () -> unboundedProcessor.onError(runtimeException),
+ () -> unboundedProcessor.subscribe(assertSubscriber),
+ () -> {
+ assertSubscriber.request(1);
+ assertSubscriber.request(1);
+ assertSubscriber.request(1);
+ assertSubscriber.request(1);
+ },
+ assertSubscriber::cancel);
+
+ assertSubscriber.values().forEach(ReferenceCountUtil::release);
+ allocator.assertHasNoLeaks();
+ }
+ }
+
+ @ParameterizedTest(
+ name =
+ "Ensures that racing between onNext + dispose | downstream async drain should not cause any issues and leaks; mode[fusionEnabled={0}]")
+ @ValueSource(booleans = {true, false})
+ public void ensuresAsyncFusionAndDisposureHasNoDeadlock(boolean withFusionEnabled) {
+ final LeaksTrackingByteBufAllocator allocator =
+ LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT);
+
+ for (int i = 0; i < 10000; i++) {
+ final UnboundedProcessor unboundedProcessor = new UnboundedProcessor<>();
+ final ByteBuf buffer1 = allocator.buffer(1);
+ final ByteBuf buffer2 = allocator.buffer(2);
+ final ByteBuf buffer3 = allocator.buffer(3);
+ final ByteBuf buffer4 = allocator.buffer(4);
+ final ByteBuf buffer5 = allocator.buffer(5);
+ final ByteBuf buffer6 = allocator.buffer(6);
+
+ final AssertSubscriber assertSubscriber =
+ new AssertSubscriber()
+ .requestedFusionMode(withFusionEnabled ? Fuseable.ANY : Fuseable.NONE);
+
+ unboundedProcessor.subscribe(assertSubscriber);
+
+ RaceTestUtils.race(
+ () -> {
+ unboundedProcessor.onNext(buffer1);
+ unboundedProcessor.onNext(buffer2);
+ unboundedProcessor.onNext(buffer3);
+ unboundedProcessor.onNext(buffer4);
+ unboundedProcessor.onNext(buffer5);
+ unboundedProcessor.onNext(buffer6);
+ unboundedProcessor.dispose();
+ },
+ unboundedProcessor::dispose);
+
+ assertSubscriber.await(Duration.ofSeconds(50)).values().forEach(ReferenceCountUtil::release);
+ }
+
+ allocator.assertHasNoLeaks();
}
}
diff --git a/rsocket-core/src/test/java/io/rsocket/internal/subscriber/AssertSubscriber.java b/rsocket-core/src/test/java/io/rsocket/internal/subscriber/AssertSubscriber.java
index 28206b4ff..54b99c797 100644
--- a/rsocket-core/src/test/java/io/rsocket/internal/subscriber/AssertSubscriber.java
+++ b/rsocket-core/src/test/java/io/rsocket/internal/subscriber/AssertSubscriber.java
@@ -864,8 +864,11 @@ public void cancel() {
if (a != null && a != Operators.cancelledSubscription()) {
a.cancel();
- if (establishedFusionMode == Fuseable.ASYNC && WIP.getAndIncrement(this) == 0) {
- qs.clear();
+ if (establishedFusionMode == Fuseable.ASYNC) {
+ final int previousState = markWorkAdded();
+ if (!isWorkInProgress(previousState)) {
+ clearAndFinalize();
+ }
}
}
}
@@ -924,11 +927,54 @@ public void onNext(T t) {
}
}
- void drain() {
- if (this.wip != 0 || WIP.getAndIncrement(this) != 0) {
- if (isCancelled()) {
- qs.clear();
+ static boolean isFinalized(int state) {
+ return state == Integer.MIN_VALUE;
+ }
+
+ static boolean isWorkInProgress(int state) {
+ return state > 0;
+ }
+
+ int markWorkAdded() {
+ for (; ; ) {
+ int state = this.wip;
+
+ if (isFinalized(state)) {
+ return state;
+ }
+
+ if ((state & Integer.MAX_VALUE) == Integer.MAX_VALUE) {
+ return state;
+ }
+ int nextState = state + 1;
+
+ if (WIP.compareAndSet(this, state, nextState)) {
+ return state;
+ }
+ }
+ }
+
+ void clearAndFinalize() {
+ final Fuseable.QueueSubscription qs = this.qs;
+ for (; ; ) {
+ int state = this.wip;
+
+ qs.clear();
+
+ if (WIP.compareAndSet(this, state, Integer.MIN_VALUE)) {
+ return;
}
+ }
+ }
+
+ void drain() {
+ final int previousState = markWorkAdded();
+ if (isWorkInProgress(previousState)) {
+ return;
+ }
+
+ if (isFinalized(previousState)) {
+ qs.clear();
return;
}
@@ -936,14 +982,14 @@ void drain() {
int m = 1;
for (; ; ) {
if (isCancelled()) {
- qs.clear();
+ clearAndFinalize();
break;
}
boolean done = this.done;
t = qs.poll();
if (t == null) {
if (done) {
- qs.clear(); // clear upstream to terminated it due to the contract
+ clearAndFinalize();
cdl.countDown();
return;
}
@@ -973,39 +1019,41 @@ public void onSubscribe(Subscription s) {
subscriptionCount++;
int requestMode = requestedFusionMode;
if (requestMode >= 0) {
- if (!setWithoutRequesting(s)) {
- if (!isCancelled()) {
- errors.add(new IllegalStateException("Subscription already set: " + subscriptionCount));
- }
- } else {
- if (s instanceof Fuseable.QueueSubscription) {
- this.qs = (Fuseable.QueueSubscription) s;
+ if (s instanceof Fuseable.QueueSubscription) {
+ this.qs = (Fuseable.QueueSubscription) s;
- int m = qs.requestFusion(requestMode);
- establishedFusionMode = m;
+ int m = qs.requestFusion(requestMode);
+ establishedFusionMode = m;
- if (m == Fuseable.SYNC) {
- for (; ; ) {
- T v = qs.poll();
- if (v == null) {
- onComplete();
- break;
- }
+ if (!setWithoutRequesting(s)) {
+ qs.clear();
+ if (!isCancelled()) {
+ errors.add(new IllegalStateException("Subscription already set: " + subscriptionCount));
+ }
+ return;
+ }
- onNext(v);
+ if (m == Fuseable.SYNC) {
+ for (; ; ) {
+ T v = qs.poll();
+ if (v == null) {
+ onComplete();
+ break;
}
- } else {
- requestDeferred();
+
+ onNext(v);
}
} else {
requestDeferred();
}
+
+ return;
}
- } else {
- if (!set(s)) {
- if (!isCancelled()) {
- errors.add(new IllegalStateException("Subscription already set: " + subscriptionCount));
- }
+ }
+
+ if (!set(s)) {
+ if (!isCancelled()) {
+ errors.add(new IllegalStateException("Subscription already set: " + subscriptionCount));
}
}
}