From 480ef2d950a73ec865b35cbf8b8ad60f34621241 Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Tue, 18 May 2021 09:26:36 +0300 Subject: [PATCH 1/2] migrates from deprecated RaceTestUtils.race; fixes observed issues Signed-off-by: Oleh Dokuka Signed-off-by: Oleh Dokuka --- build.gradle | 1 + .../io/rsocket/core/RSocketConnector.java | 2 +- .../io/rsocket/core/ResolvingOperator.java | 15 +- .../rsocket/internal/UnboundedProcessor.java | 64 ++-- .../io/rsocket/core/RSocketRequesterTest.java | 13 +- .../io/rsocket/core/RSocketResponderTest.java | 32 +- .../io/rsocket/core/ReconnectMonoTests.java | 83 ++--- .../rsocket/core/ResolvingOperatorTests.java | 34 +-- .../io/rsocket/exceptions/ExceptionsTest.java | 2 + .../internal/UnboundedProcessorTest.java | 289 ++++++++++++++---- .../internal/subscriber/AssertSubscriber.java | 112 +++++-- 11 files changed, 426 insertions(+), 221 deletions(-) diff --git a/build.gradle b/build.gradle index 5230ce018..64e7401df 100644 --- a/build.gradle +++ b/build.gradle @@ -128,6 +128,7 @@ subprojects { links 'https://projectreactor.io/docs/core/release/api/' links 'https://netty.io/4.1/api/' } + failOnError = false } tasks.named("javadoc").configure { diff --git a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java index b6eaec7c9..eab70cc30 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RSocketConnector.java @@ -213,7 +213,7 @@ public RSocketConnector metadataMimeType(String metadataMimeType) { *
  • For server-to-server connections, a reasonable time interval between client {@code * KEEPALIVE} frames is 500ms. *
  • For mobile-to-server connections, the time interval between client {@code KEEPALIVE} - * frames is often > 30,000ms. + * frames is often {@code >} 30,000ms. * * *

    By default these are set to 20 seconds and 90 seconds respectively. diff --git a/rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java b/rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java index c431b3f3f..979743fb1 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java +++ b/rsocket-core/src/main/java/io/rsocket/core/ResolvingOperator.java @@ -153,19 +153,19 @@ public T block(@Nullable Duration timeout) { delay = System.nanoTime() + timeout.toNanos(); } for (; ; ) { - BiConsumer[] inners = this.subscribers; + subscribers = this.subscribers; - if (inners == READY) { + if (subscribers == READY) { final T value = this.value; if (value != null) { return value; } else { // value == null means racing between invalidate and this block // thus, we have to update the state again and see what happened - inners = this.subscribers; + subscribers = this.subscribers; } } - if (inners == TERMINATED) { + if (subscribers == TERMINATED) { RuntimeException re = Exceptions.propagate(this.t); re = Exceptions.addSuppressed(re, new Exception("Terminated with an error")); throw re; @@ -174,6 +174,12 @@ public T block(@Nullable Duration timeout) { throw new IllegalStateException("Timeout on Mono blocking read"); } + // connect again since invalidate() has happened in between + if (subscribers == EMPTY_UNSUBSCRIBED + && SUBSCRIBERS.compareAndSet(this, EMPTY_UNSUBSCRIBED, EMPTY_SUBSCRIBED)) { + this.doSubscribe(); + } + Thread.sleep(1); } } catch (InterruptedException ie) { @@ -186,6 +192,7 @@ public T block(@Nullable Duration timeout) { @SuppressWarnings("unchecked") final void terminate(Throwable t) { if (isDisposed()) { + Operators.onErrorDropped(t, Context.empty()); return; } diff --git a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java index d2a438dfd..94d5e9a7a 100644 --- a/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java +++ b/rsocket-core/src/main/java/io/rsocket/internal/UnboundedProcessor.java @@ -115,13 +115,9 @@ void drainRegular(Subscriber a) { while (r != e) { boolean d = done; - T t; - boolean empty; - - if (!pq.isEmpty()) { - t = pq.poll(); - empty = false; - } else { + T t = pq.poll(); + boolean empty = t == null; + if (empty) { t = q.poll(); empty = t == null; } @@ -196,8 +192,9 @@ void drainFused(Subscriber a) { } public void drain() { - if (WIP.getAndIncrement(this) != 0) { - if ((!outputFused && cancelled) || terminated) { + final int previousWip = WIP.getAndIncrement(this); + if (previousWip != 0) { + if (previousWip < 0 || terminated) { this.clear(); } return; @@ -231,6 +228,7 @@ boolean checkTerminated(boolean d, boolean empty, Subscriber a) { return true; } if (d && empty) { + this.clear(); Throwable e = error; hasDownstream = false; if (e != null) { @@ -330,11 +328,7 @@ public void subscribe(CoreSubscriber actual) { actual.onSubscribe(this); this.actual = actual; - if (cancelled) { - this.hasDownstream = false; - } else { - drain(); - } + drain(); } else { Operators.error( actual, @@ -388,6 +382,18 @@ public boolean isEmpty() { @Override public void clear() { terminated = true; + for (; ; ) { + int wip = this.wip; + + clearSafely(); + + if (WIP.compareAndSet(this, wip, Integer.MIN_VALUE)) { + return; + } + } + } + + void clearSafely() { if (DISCARD_GUARD.getAndIncrement(this) != 0) { return; } @@ -428,34 +434,20 @@ public void dispose() { error = new CancellationException("Disposed"); done = true; - boolean once = true; if (WIP.getAndIncrement(this) == 0) { cancelled = true; - int m = 1; - for (; ; ) { - final CoreSubscriber a = this.actual; - - if (!outputFused || terminated) { - clear(); - } - - if (a != null && once) { - try { - a.onError(error); - } catch (Throwable ignored) { - } - } + final CoreSubscriber a = this.actual; - cancelled = true; - once = false; + if (!outputFused || terminated) { + clear(); + } - int wip = this.wip; - if (wip == m) { - break; + if (a != null) { + try { + a.onError(error); + } catch (Throwable ignored) { } - m = wip; } - hasDownstream = false; } } diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java index 1ce68cfeb..b5a3dcb83 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java @@ -91,7 +91,6 @@ import reactor.core.publisher.Mono; import reactor.core.publisher.MonoProcessor; import reactor.core.publisher.UnicastProcessor; -import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import reactor.test.publisher.TestPublisher; import reactor.test.util.RaceTestUtils; @@ -1082,15 +1081,11 @@ public void shouldTerminateAllStreamsIfThereRacingBetweenDisposeAndRequests( Publisher publisher2 = interaction2.apply(rule, payload2); RaceTestUtils.race( () -> rule.socket.dispose(), - () -> - RaceTestUtils.race( - () -> publisher1.subscribe(assertSubscriber1), - () -> publisher2.subscribe(assertSubscriber2), - Schedulers.parallel()), - Schedulers.parallel()); + () -> publisher1.subscribe(assertSubscriber1), + () -> publisher2.subscribe(assertSubscriber2)); assertSubscriber1.await().assertTerminated(); - if (interactionType1 != REQUEST_FNF) { + if (interactionType1 != REQUEST_FNF && interactionType1 != METADATA_PUSH) { assertSubscriber1.assertError(ClosedChannelException.class); } else { try { @@ -1101,7 +1096,7 @@ public void shouldTerminateAllStreamsIfThereRacingBetweenDisposeAndRequests( } } assertSubscriber2.await().assertTerminated(); - if (interactionType2 != REQUEST_FNF) { + if (interactionType2 != REQUEST_FNF && interactionType2 != METADATA_PUSH) { assertSubscriber2.assertError(ClosedChannelException.class); } else { try { diff --git a/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java b/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java index 0d0fbd8c0..76691adce 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java +++ b/rsocket-core/src/test/java/io/rsocket/core/RSocketResponderTest.java @@ -84,8 +84,6 @@ import reactor.core.publisher.Hooks; import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; import reactor.test.publisher.TestPublisher; import reactor.test.util.RaceTestUtils; @@ -340,7 +338,6 @@ public Flux requestChannel(Publisher payloads) { @Test public void checkNoLeaksOnRacingBetweenDownstreamCancelAndOnNextFromRequestChannelTest1() { - Scheduler parallel = Schedulers.parallel(); Hooks.onErrorDropped((e) -> {}); ByteBufAllocator allocator = rule.alloc(); for (int i = 0; i < 10000; i++) { @@ -366,17 +363,13 @@ public Flux requestChannel(Publisher payloads) { ByteBuf requestNFrame = RequestNFrameCodec.encode(allocator, 1, Integer.MAX_VALUE); FluxSink sink = sinks[0]; RaceTestUtils.race( - () -> - RaceTestUtils.race( - () -> rule.connection.addToReceivedBuffer(requestNFrame), - () -> rule.connection.addToReceivedBuffer(cancelFrame), - parallel), + () -> rule.connection.addToReceivedBuffer(requestNFrame), + () -> rule.connection.addToReceivedBuffer(cancelFrame), () -> { sink.next(ByteBufPayload.create("d1", "m1")); sink.next(ByteBufPayload.create("d2", "m2")); sink.next(ByteBufPayload.create("d3", "m3")); - }, - parallel); + }); Assertions.assertThat(rule.connection.getSent()).allMatch(ReferenceCounted::release); @@ -387,7 +380,6 @@ public Flux requestChannel(Publisher payloads) { @Test public void checkNoLeaksOnRacingBetweenDownstreamCancelAndOnNextFromUpstreamOnErrorFromRequestChannelTest1() { - Scheduler parallel = Schedulers.parallel(); Hooks.onErrorDropped((e) -> {}); ByteBufAllocator allocator = rule.alloc(); for (int i = 0; i < 10000; i++) { @@ -453,18 +445,14 @@ public Flux requestChannel(Publisher payloads) { FluxSink sink = sinks[0]; RaceTestUtils.race( - () -> - RaceTestUtils.race( - () -> rule.connection.addToReceivedBuffer(requestNFrame), - () -> rule.connection.addToReceivedBuffer(nextFrame1, nextFrame2, nextFrame3), - parallel), + () -> rule.connection.addToReceivedBuffer(requestNFrame), + () -> rule.connection.addToReceivedBuffer(nextFrame1, nextFrame2, nextFrame3), () -> { sink.next(np1); sink.next(np2); sink.next(np3); sink.error(new RuntimeException()); - }, - parallel); + }); Assertions.assertThat(rule.connection.getSent()).allMatch(ReferenceCounted::release); @@ -484,7 +472,6 @@ public Flux requestChannel(Publisher payloads) { @Test public void checkNoLeaksOnRacingBetweenDownstreamCancelAndOnNextFromRequestStreamTest1() { - Scheduler parallel = Schedulers.parallel(); Hooks.onErrorDropped((e) -> {}); ByteBufAllocator allocator = rule.alloc(); for (int i = 0; i < 10000; i++) { @@ -510,8 +497,7 @@ public Flux requestStream(Payload payload) { sink.next(ByteBufPayload.create("d1", "m1")); sink.next(ByteBufPayload.create("d2", "m2")); sink.next(ByteBufPayload.create("d3", "m3")); - }, - parallel); + }); Assertions.assertThat(rule.connection.getSent()).allMatch(ReferenceCounted::release); @@ -521,7 +507,6 @@ public Flux requestStream(Payload payload) { @Test public void checkNoLeaksOnRacingBetweenDownstreamCancelAndOnNextFromRequestResponseTest1() { - Scheduler parallel = Schedulers.parallel(); Hooks.onErrorDropped((e) -> {}); ByteBufAllocator allocator = rule.alloc(); for (int i = 0; i < 10000; i++) { @@ -550,8 +535,7 @@ public void subscribe(CoreSubscriber actual) { () -> rule.connection.addToReceivedBuffer(cancelFrame), () -> { sources[0].complete(ByteBufPayload.create("d1", "m1")); - }, - parallel); + }); Assertions.assertThat(rule.connection.getSent()).allMatch(ReferenceCounted::release); diff --git a/rsocket-core/src/test/java/io/rsocket/core/ReconnectMonoTests.java b/rsocket-core/src/test/java/io/rsocket/core/ReconnectMonoTests.java index 8d96222df..ad3013f8e 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/ReconnectMonoTests.java +++ b/rsocket-core/src/test/java/io/rsocket/core/ReconnectMonoTests.java @@ -18,6 +18,7 @@ import static org.junit.Assert.assertEquals; +import io.rsocket.internal.subscriber.AssertSubscriber; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; @@ -29,6 +30,7 @@ import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.assertj.core.api.Assertions; import org.junit.Test; @@ -174,8 +176,7 @@ public void shouldNotExpireNewlyResolvedValueIfSubscribeIsRacingWithInvalidate() reconnectMono.resolvingInner.mainSubscriber.onNext("value_to_not_expire" + index); reconnectMono.resolvingInner.mainSubscriber.onComplete(); } - }, - Schedulers.parallel()); + }); Assertions.assertThat(processor.isTerminated()).isTrue(); @@ -231,9 +232,8 @@ public void shouldNotExpireNewlyResolvedValueIfSubscribeIsRacingWithInvalidates( reconnectMono.resolvingInner.mainSubscriber.onComplete(); RaceTestUtils.race( - () -> - RaceTestUtils.race( - reconnectMono::invalidate, reconnectMono::invalidate, Schedulers.parallel()), + reconnectMono::invalidate, + reconnectMono::invalidate, () -> { reconnectMono.subscribe(racerProcessor); if (!racerProcessor.isTerminated()) { @@ -241,8 +241,7 @@ public void shouldNotExpireNewlyResolvedValueIfSubscribeIsRacingWithInvalidates( "value_to_possibly_expire" + index); reconnectMono.resolvingInner.mainSubscriber.onComplete(); } - }, - Schedulers.parallel()); + }); Assertions.assertThat(processor.isTerminated()).isTrue(); @@ -284,46 +283,54 @@ public void shouldNotExpireNewlyResolvedValueIfBlockIsRacingWithInvalidate() { Hooks.onErrorDropped(t -> {}); for (int i = 0; i < 10000; i++) { final int index = i; - final TestPublisher cold = - TestPublisher.createNoncompliant(TestPublisher.Violation.REQUEST_OVERFLOW); + final Mono source = + Mono.fromSupplier( + new Supplier() { + boolean once = false; - final ReconnectMono reconnectMono = - cold.mono().as(source -> new ReconnectMono<>(source, onExpire(), onValue())); + @Override + public String get() { - final MonoProcessor processor = reconnectMono.subscribeWith(MonoProcessor.create()); + if (!once) { + once = true; + return "value_to_expire" + index; + } + + return "value_to_not_expire" + index; + } + }); + + final ReconnectMono reconnectMono = + new ReconnectMono<>( + source.subscribeOn(Schedulers.boundedElastic()), onExpire(), onValue()); Assertions.assertThat(expired).isEmpty(); Assertions.assertThat(received).isEmpty(); - reconnectMono.resolvingInner.mainSubscriber.onNext("value_to_expire" + i); - reconnectMono.resolvingInner.mainSubscriber.onComplete(); + final AssertSubscriber subscriber = + reconnectMono.subscribeWith(new AssertSubscriber<>()); - RaceTestUtils.race( - () -> - Assertions.assertThat(reconnectMono.block()) - .matches( - (v) -> - v.equals("value_to_not_expire" + index) - || v.equals("value_to_expire" + index)), - () -> - RaceTestUtils.race( - reconnectMono::invalidate, - () -> { - for (; ; ) { - if (reconnectMono.resolvingInner.subscribers != ResolvingOperator.READY) { - reconnectMono.resolvingInner.mainSubscriber.onNext( - "value_to_not_expire" + index); - reconnectMono.resolvingInner.mainSubscriber.onComplete(); - break; - } - } - }, - Schedulers.parallel()), - Schedulers.parallel()); + subscriber.await().assertComplete(); - Assertions.assertThat(processor.isTerminated()).isTrue(); + Assertions.assertThat(expired).isEmpty(); - Assertions.assertThat(processor.peek()).isEqualTo("value_to_expire" + i); + try { + + RaceTestUtils.race( + () -> + Assertions.assertThat(reconnectMono.block()) + .matches( + (v) -> + v.equals("value_to_not_expire" + index) + || v.equals("value_to_expire" + index)), + reconnectMono::invalidate); + } catch (Throwable t) { + t.printStackTrace(); + } + + subscriber.assertTerminated(); + + subscriber.assertValues("value_to_expire" + i); Assertions.assertThat(expired).hasSize(1).containsOnly("value_to_expire" + i); if (reconnectMono.resolvingInner.subscribers == ResolvingOperator.READY) { diff --git a/rsocket-core/src/test/java/io/rsocket/core/ResolvingOperatorTests.java b/rsocket-core/src/test/java/io/rsocket/core/ResolvingOperatorTests.java index 29748abbe..608e1a336 100644 --- a/rsocket-core/src/test/java/io/rsocket/core/ResolvingOperatorTests.java +++ b/rsocket-core/src/test/java/io/rsocket/core/ResolvingOperatorTests.java @@ -38,7 +38,6 @@ import org.reactivestreams.Subscription; import reactor.core.publisher.Hooks; import reactor.core.publisher.MonoProcessor; -import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import reactor.test.util.RaceTestUtils; import reactor.util.retry.Retry; @@ -194,8 +193,7 @@ public void shouldNotExpireNewlyResolvedValueIfSubscribeIsRacingWithInvalidate() if (!processor2.isTerminated()) { self.complete(valueToSend2); } - }, - Schedulers.parallel())) + })) .then( self -> { if (self.isPending()) { @@ -270,16 +268,14 @@ public void shouldNotExpireNewlyResolvedValueIfSubscribeIsRacingWithInvalidates( .then( self -> RaceTestUtils.race( - () -> - RaceTestUtils.race( - self::invalidate, self::invalidate, Schedulers.parallel()), + self::invalidate, + self::invalidate, () -> { self.observe(consumer2); if (!processor2.isTerminated()) { self.complete(valueToSend2); } - }, - Schedulers.parallel())) + })) .then( self -> { if (!self.isPending()) { @@ -371,19 +367,15 @@ public void shouldNotExpireNewlyResolvedValueIfBlockIsRacingWithInvalidate() { () -> Assertions.assertThat(self.block(null)) .matches((v) -> v.equals(valueToSend) || v.equals(valueToSend2)), - () -> - RaceTestUtils.race( - self::invalidate, - () -> { - for (; ; ) { - if (self.subscribers != ResolvingOperator.READY) { - self.complete(valueToSend2); - break; - } - } - }, - Schedulers.parallel()), - Schedulers.parallel())) + self::invalidate, + () -> { + for (; ; ) { + if (self.subscribers != ResolvingOperator.READY) { + self.complete(valueToSend2); + break; + } + } + })) .then( self -> { if (self.isPending()) { diff --git a/rsocket-core/src/test/java/io/rsocket/exceptions/ExceptionsTest.java b/rsocket-core/src/test/java/io/rsocket/exceptions/ExceptionsTest.java index b3f596a37..b09548245 100644 --- a/rsocket-core/src/test/java/io/rsocket/exceptions/ExceptionsTest.java +++ b/rsocket-core/src/test/java/io/rsocket/exceptions/ExceptionsTest.java @@ -217,6 +217,8 @@ void fromCustomRSocketException() { assertThat(Exceptions.from(0, byteBuf)) .hasMessage("Invalid Error frame in Stream ID 0: 0x%08X '%s'", randomCode, "test-message") .isInstanceOf(IllegalArgumentException.class); + + byteBuf.release(); } } diff --git a/rsocket-core/src/test/java/io/rsocket/internal/UnboundedProcessorTest.java b/rsocket-core/src/test/java/io/rsocket/internal/UnboundedProcessorTest.java index 271c08664..5177a65be 100644 --- a/rsocket-core/src/test/java/io/rsocket/internal/UnboundedProcessorTest.java +++ b/rsocket-core/src/test/java/io/rsocket/internal/UnboundedProcessorTest.java @@ -28,13 +28,11 @@ import java.time.Duration; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.RepeatedTest; -import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import reactor.core.Fuseable; import reactor.core.publisher.Hooks; -import reactor.core.publisher.Operators; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import reactor.test.util.RaceTestUtils; @@ -110,7 +108,7 @@ public void testPrioritizedSending(boolean fusedCase) { public void ensureUnboundedProcessorDisposesQueueProperly(boolean withFusionEnabled) { final LeaksTrackingByteBufAllocator allocator = LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); - for (int i = 0; i < 100000; i++) { + for (int i = 0; i < 10000; i++) { final UnboundedProcessor unboundedProcessor = new UnboundedProcessor<>(); final ByteBuf buffer1 = allocator.buffer(1); @@ -123,68 +121,247 @@ public void ensureUnboundedProcessorDisposesQueueProperly(boolean withFusionEnab unboundedProcessor.subscribe(assertSubscriber); RaceTestUtils.race( - () -> - RaceTestUtils.race( - () -> - RaceTestUtils.race( - () -> { - unboundedProcessor.onNext(buffer1); - unboundedProcessor.onNext(buffer2); - }, - unboundedProcessor::dispose, - Schedulers.elastic()), - assertSubscriber::cancel, - Schedulers.elastic()), + () -> { + unboundedProcessor.onNext(buffer1); + unboundedProcessor.onNext(buffer2); + }, + unboundedProcessor::dispose, + assertSubscriber::cancel, () -> { assertSubscriber.request(1); assertSubscriber.request(1); + }); + + assertSubscriber.values().forEach(ReferenceCountUtil::release); + + allocator.assertHasNoLeaks(); + } + } + + @ParameterizedTest( + name = + "Ensures that racing between onNext | dispose | cancel | request(n) | terminal will not cause any issues and leaks; mode[fusionEnabled={0}]") + @ValueSource(booleans = {true, false}) + public void smokeTest1(boolean withFusionEnabled) { + final LeaksTrackingByteBufAllocator allocator = + LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); + final RuntimeException runtimeException = new RuntimeException("test"); + for (int i = 0; i < 10000; i++) { + final UnboundedProcessor unboundedProcessor = new UnboundedProcessor<>(); + + final ByteBuf buffer1 = allocator.buffer(1); + final ByteBuf buffer2 = allocator.buffer(2); + final ByteBuf buffer3 = allocator.buffer(3); + final ByteBuf buffer4 = allocator.buffer(4); + + final AssertSubscriber assertSubscriber = + new AssertSubscriber(0) + .requestedFusionMode(withFusionEnabled ? Fuseable.ANY : Fuseable.NONE); + + unboundedProcessor.subscribe(assertSubscriber); + + RaceTestUtils.race( + () -> { + unboundedProcessor.onNext(buffer1); + unboundedProcessor.onNextPrioritized(buffer2); + }, + () -> { + unboundedProcessor.onNextPrioritized(buffer3); + unboundedProcessor.onNext(buffer4); + }, + unboundedProcessor::dispose, + unboundedProcessor::onComplete, + () -> unboundedProcessor.onError(runtimeException), + assertSubscriber::cancel, + () -> { + assertSubscriber.request(1); + assertSubscriber.request(1); + assertSubscriber.request(1); + assertSubscriber.request(1); + }); + + assertSubscriber.values().forEach(ReferenceCountUtil::release); + + allocator.assertHasNoLeaks(); + } + } + + @ParameterizedTest( + name = + "Ensures that racing between onNext | dispose | subscribe | request(n) | terminal will not cause any issues and leaks; mode[fusionEnabled={0}]") + @ValueSource(booleans = {true, false}) + @Disabled("hard to support in 1.0.x") + public void smokeTest2(boolean withFusionEnabled) { + final LeaksTrackingByteBufAllocator allocator = + LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); + final RuntimeException runtimeException = new RuntimeException("test"); + for (int i = 0; i < 10000; i++) { + final UnboundedProcessor unboundedProcessor = new UnboundedProcessor<>(); + + final ByteBuf buffer1 = allocator.buffer(1); + final ByteBuf buffer2 = allocator.buffer(2); + final ByteBuf buffer3 = allocator.buffer(3); + final ByteBuf buffer4 = allocator.buffer(4); + + final AssertSubscriber assertSubscriber = + new AssertSubscriber(0) + .requestedFusionMode(withFusionEnabled ? Fuseable.ANY : Fuseable.NONE); + + RaceTestUtils.race( + Schedulers.boundedElastic(), + () -> { + unboundedProcessor.onNext(buffer1); + unboundedProcessor.onNextPrioritized(buffer2); + }, + () -> { + unboundedProcessor.onNextPrioritized(buffer3); + unboundedProcessor.onNext(buffer4); }, - Schedulers.elastic()); + unboundedProcessor::dispose, + unboundedProcessor::onComplete, + () -> unboundedProcessor.onError(runtimeException), + () -> { + unboundedProcessor.subscribe(assertSubscriber); + assertSubscriber.request(1); + assertSubscriber.request(1); + assertSubscriber.request(1); + assertSubscriber.request(1); + }); - assertSubscriber.values().forEach(ReferenceCountUtil::safeRelease); + assertSubscriber.values().forEach(ReferenceCountUtil::release); allocator.assertHasNoLeaks(); } } - @RepeatedTest( + @ParameterizedTest( name = - "Ensures that racing between onNext + dispose | downstream async drain should not cause any issues and leaks", - value = 100000) - @Timeout(60) - public void ensuresAsyncFusionAndDisposureHasNoDeadlock() { - // TODO: enable leaks tracking - // final LeaksTrackingByteBufAllocator allocator = - // LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); - final UnboundedProcessor unboundedProcessor = new UnboundedProcessor<>(); - - // final ByteBuf buffer1 = allocator.buffer(1); - // final ByteBuf buffer2 = allocator.buffer(2); - - final AssertSubscriber assertSubscriber = - new AssertSubscriber<>(Operators.enableOnDiscard(null, ReferenceCountUtil::safeRelease)); - - unboundedProcessor.publishOn(Schedulers.parallel()).subscribe(assertSubscriber); - - RaceTestUtils.race( - () -> { - // unboundedProcessor.onNext(buffer1); - // unboundedProcessor.onNext(buffer2); - unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER); - unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER); - unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER); - unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER); - unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER); - unboundedProcessor.onNext(Unpooled.EMPTY_BUFFER); - unboundedProcessor.dispose(); - }, - unboundedProcessor::dispose); - - assertSubscriber - .await(Duration.ofSeconds(50)) - .values() - .forEach(ReferenceCountUtil::safeRelease); - - // allocator.assertHasNoLeaks(); + "Ensures that racing between onNext | dispose | subscribe(cancelled) | terminal will not cause any issues and leaks; mode[fusionEnabled={0}]") + @ValueSource(booleans = {true, false}) + public void smokeTest3(boolean withFusionEnabled) { + final LeaksTrackingByteBufAllocator allocator = + LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); + final RuntimeException runtimeException = new RuntimeException("test"); + for (int i = 0; i < 10000; i++) { + final UnboundedProcessor unboundedProcessor = new UnboundedProcessor<>(); + + final ByteBuf buffer1 = allocator.buffer(1); + final ByteBuf buffer2 = allocator.buffer(2); + final ByteBuf buffer3 = allocator.buffer(3); + final ByteBuf buffer4 = allocator.buffer(4); + + final AssertSubscriber assertSubscriber = + new AssertSubscriber(0) + .requestedFusionMode(withFusionEnabled ? Fuseable.ANY : Fuseable.NONE); + + assertSubscriber.cancel(); + + RaceTestUtils.race( + Schedulers.boundedElastic(), + () -> { + unboundedProcessor.onNext(buffer1); + unboundedProcessor.onNextPrioritized(buffer2); + }, + () -> { + unboundedProcessor.onNextPrioritized(buffer3); + unboundedProcessor.onNext(buffer4); + }, + unboundedProcessor::dispose, + unboundedProcessor::onComplete, + () -> unboundedProcessor.onError(runtimeException), + () -> unboundedProcessor.subscribe(assertSubscriber)); + + assertSubscriber.values().forEach(ReferenceCountUtil::release); + + allocator.assertHasNoLeaks(); + } + } + + @ParameterizedTest( + name = + "Ensures that racing between onNext | dispose | subscribe(cancelled) | terminal will not cause any issues and leaks; mode[fusionEnabled={0}]") + @ValueSource(booleans = {true, false}) + public void smokeTest31(boolean withFusionEnabled) { + final LeaksTrackingByteBufAllocator allocator = + LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); + final RuntimeException runtimeException = new RuntimeException("test"); + for (int i = 0; i < 10000; i++) { + final UnboundedProcessor unboundedProcessor = new UnboundedProcessor<>(); + + final ByteBuf buffer1 = allocator.buffer(1); + final ByteBuf buffer2 = allocator.buffer(2); + final ByteBuf buffer3 = allocator.buffer(3); + final ByteBuf buffer4 = allocator.buffer(4); + + final AssertSubscriber assertSubscriber = + new AssertSubscriber(0) + .requestedFusionMode(withFusionEnabled ? Fuseable.ANY : Fuseable.NONE); + + RaceTestUtils.race( + Schedulers.boundedElastic(), + () -> { + unboundedProcessor.onNext(buffer1); + unboundedProcessor.onNextPrioritized(buffer2); + }, + () -> { + unboundedProcessor.onNextPrioritized(buffer3); + unboundedProcessor.onNext(buffer4); + }, + unboundedProcessor::dispose, + unboundedProcessor::onComplete, + () -> unboundedProcessor.onError(runtimeException), + () -> unboundedProcessor.subscribe(assertSubscriber), + () -> { + assertSubscriber.request(1); + assertSubscriber.request(1); + assertSubscriber.request(1); + assertSubscriber.request(1); + }, + assertSubscriber::cancel); + + assertSubscriber.values().forEach(ReferenceCountUtil::release); + allocator.assertHasNoLeaks(); + } + } + + @ParameterizedTest( + name = + "Ensures that racing between onNext + dispose | downstream async drain should not cause any issues and leaks; mode[fusionEnabled={0}]") + @ValueSource(booleans = {true, false}) + public void ensuresAsyncFusionAndDisposureHasNoDeadlock(boolean withFusionEnabled) { + final LeaksTrackingByteBufAllocator allocator = + LeaksTrackingByteBufAllocator.instrument(ByteBufAllocator.DEFAULT); + + for (int i = 0; i < 10000; i++) { + final UnboundedProcessor unboundedProcessor = new UnboundedProcessor<>(); + final ByteBuf buffer1 = allocator.buffer(1); + final ByteBuf buffer2 = allocator.buffer(2); + final ByteBuf buffer3 = allocator.buffer(3); + final ByteBuf buffer4 = allocator.buffer(4); + final ByteBuf buffer5 = allocator.buffer(5); + final ByteBuf buffer6 = allocator.buffer(6); + + final AssertSubscriber assertSubscriber = + new AssertSubscriber() + .requestedFusionMode(withFusionEnabled ? Fuseable.ANY : Fuseable.NONE); + + unboundedProcessor.subscribe(assertSubscriber); + + RaceTestUtils.race( + () -> { + unboundedProcessor.onNext(buffer1); + unboundedProcessor.onNext(buffer2); + unboundedProcessor.onNext(buffer3); + unboundedProcessor.onNext(buffer4); + unboundedProcessor.onNext(buffer5); + unboundedProcessor.onNext(buffer6); + unboundedProcessor.dispose(); + }, + unboundedProcessor::dispose); + + assertSubscriber.await(Duration.ofSeconds(50)).values().forEach(ReferenceCountUtil::release); + } + + allocator.assertHasNoLeaks(); } } diff --git a/rsocket-core/src/test/java/io/rsocket/internal/subscriber/AssertSubscriber.java b/rsocket-core/src/test/java/io/rsocket/internal/subscriber/AssertSubscriber.java index 28206b4ff..54b99c797 100644 --- a/rsocket-core/src/test/java/io/rsocket/internal/subscriber/AssertSubscriber.java +++ b/rsocket-core/src/test/java/io/rsocket/internal/subscriber/AssertSubscriber.java @@ -864,8 +864,11 @@ public void cancel() { if (a != null && a != Operators.cancelledSubscription()) { a.cancel(); - if (establishedFusionMode == Fuseable.ASYNC && WIP.getAndIncrement(this) == 0) { - qs.clear(); + if (establishedFusionMode == Fuseable.ASYNC) { + final int previousState = markWorkAdded(); + if (!isWorkInProgress(previousState)) { + clearAndFinalize(); + } } } } @@ -924,11 +927,54 @@ public void onNext(T t) { } } - void drain() { - if (this.wip != 0 || WIP.getAndIncrement(this) != 0) { - if (isCancelled()) { - qs.clear(); + static boolean isFinalized(int state) { + return state == Integer.MIN_VALUE; + } + + static boolean isWorkInProgress(int state) { + return state > 0; + } + + int markWorkAdded() { + for (; ; ) { + int state = this.wip; + + if (isFinalized(state)) { + return state; + } + + if ((state & Integer.MAX_VALUE) == Integer.MAX_VALUE) { + return state; + } + int nextState = state + 1; + + if (WIP.compareAndSet(this, state, nextState)) { + return state; + } + } + } + + void clearAndFinalize() { + final Fuseable.QueueSubscription qs = this.qs; + for (; ; ) { + int state = this.wip; + + qs.clear(); + + if (WIP.compareAndSet(this, state, Integer.MIN_VALUE)) { + return; } + } + } + + void drain() { + final int previousState = markWorkAdded(); + if (isWorkInProgress(previousState)) { + return; + } + + if (isFinalized(previousState)) { + qs.clear(); return; } @@ -936,14 +982,14 @@ void drain() { int m = 1; for (; ; ) { if (isCancelled()) { - qs.clear(); + clearAndFinalize(); break; } boolean done = this.done; t = qs.poll(); if (t == null) { if (done) { - qs.clear(); // clear upstream to terminated it due to the contract + clearAndFinalize(); cdl.countDown(); return; } @@ -973,39 +1019,41 @@ public void onSubscribe(Subscription s) { subscriptionCount++; int requestMode = requestedFusionMode; if (requestMode >= 0) { - if (!setWithoutRequesting(s)) { - if (!isCancelled()) { - errors.add(new IllegalStateException("Subscription already set: " + subscriptionCount)); - } - } else { - if (s instanceof Fuseable.QueueSubscription) { - this.qs = (Fuseable.QueueSubscription) s; + if (s instanceof Fuseable.QueueSubscription) { + this.qs = (Fuseable.QueueSubscription) s; - int m = qs.requestFusion(requestMode); - establishedFusionMode = m; + int m = qs.requestFusion(requestMode); + establishedFusionMode = m; - if (m == Fuseable.SYNC) { - for (; ; ) { - T v = qs.poll(); - if (v == null) { - onComplete(); - break; - } + if (!setWithoutRequesting(s)) { + qs.clear(); + if (!isCancelled()) { + errors.add(new IllegalStateException("Subscription already set: " + subscriptionCount)); + } + return; + } - onNext(v); + if (m == Fuseable.SYNC) { + for (; ; ) { + T v = qs.poll(); + if (v == null) { + onComplete(); + break; } - } else { - requestDeferred(); + + onNext(v); } } else { requestDeferred(); } + + return; } - } else { - if (!set(s)) { - if (!isCancelled()) { - errors.add(new IllegalStateException("Subscription already set: " + subscriptionCount)); - } + } + + if (!set(s)) { + if (!isCancelled()) { + errors.add(new IllegalStateException("Subscription already set: " + subscriptionCount)); } } } From 98fd18966e84e596cbb23f8ea55491d79b40854e Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Wed, 19 May 2021 00:01:29 +0300 Subject: [PATCH 2/2] migrates from travis Signed-off-by: Oleh Dokuka Signed-off-by: Oleh Dokuka --- .github/workflows/gradle-all.yml | 45 +++++++++++++++++++++++ .github/workflows/gradle-main.yml | 53 ++++++++++++++++++++++++++++ .github/workflows/gradle-pr.yml | 31 ++++++++++++++++ .github/workflows/gradle-release.yml | 44 +++++++++++++++++++++++ .travis.yml | 45 ----------------------- ci/travis.sh | 44 ----------------------- 6 files changed, 173 insertions(+), 89 deletions(-) create mode 100644 .github/workflows/gradle-all.yml create mode 100644 .github/workflows/gradle-main.yml create mode 100644 .github/workflows/gradle-pr.yml create mode 100644 .github/workflows/gradle-release.yml delete mode 100644 .travis.yml delete mode 100755 ci/travis.sh diff --git a/.github/workflows/gradle-all.yml b/.github/workflows/gradle-all.yml new file mode 100644 index 000000000..8540539bb --- /dev/null +++ b/.github/workflows/gradle-all.yml @@ -0,0 +1,45 @@ +name: Branches Java CI + +on: + # Trigger the workflow on push + # but only for the non master/1.0.x branches + push: + branches-ignore: + - 1.0.x + - master + +jobs: + build: + + runs-on: ${{ matrix.os }} + + strategy: + matrix: + os: [ ubuntu-latest ] + jdk: [ 1.8, 11, 14 ] + fail-fast: false + + steps: + - uses: actions/checkout@v2 + - name: Set up JDK ${{ matrix.jdk }} + uses: actions/setup-java@v1 + with: + java-version: ${{ matrix.jdk }} + - name: Cache Gradle packages + uses: actions/cache@v1 + with: + path: ~/.gradle/caches + key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }} + restore-keys: ${{ runner.os }}-gradle + - name: Grant execute permission for gradlew + run: chmod +x gradlew + - name: Build with Gradle + run: ./gradlew clean build + - name: Publish Packages to Artifactory + if: ${{ matrix.jdk == '1.8' }} + run: ./gradlew -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PversionSuffix="-${githubRef#refs/heads/}-SNAPSHOT" -PbuildNumber="${buildNumber}" artifactoryPublish --stacktrace + env: + bintrayUser: ${{ secrets.bintrayUser }} + bintrayKey: ${{ secrets.bintrayKey }} + githubRef: ${{ github.ref }} + buildNumber: ${{ github.run_number }} \ No newline at end of file diff --git a/.github/workflows/gradle-main.yml b/.github/workflows/gradle-main.yml new file mode 100644 index 000000000..d8ba3c3d5 --- /dev/null +++ b/.github/workflows/gradle-main.yml @@ -0,0 +1,53 @@ +name: Main Branches Java CI + +on: + # Trigger the workflow on push + # but only for the master/1.0.x branch + push: + branches: + - master + - 1.0.x + +jobs: + build: + + runs-on: ${{ matrix.os }} + + strategy: + matrix: + os: [ ubuntu-latest ] + jdk: [ 1.8, 11, 14 ] + fail-fast: false + + steps: + - uses: actions/checkout@v2 + - name: Set up JDK ${{ matrix.jdk }} + uses: actions/setup-java@v1 + with: + java-version: ${{ matrix.jdk }} + - name: Cache Gradle packages + uses: actions/cache@v1 + with: + path: ~/.gradle/caches + key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }} + restore-keys: ${{ runner.os }}-gradle + - name: Grant execute permission for gradlew + run: chmod +x gradlew + - name: Build with Gradle + run: ./gradlew clean build + - name: Publish Packages to Artifactory + if: ${{ matrix.jdk == '1.8' }} + run: ./gradlew -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PversionSuffix="-SNAPSHOT" -PbuildNumber="${buildNumber}" artifactoryPublish --stacktrace + env: + bintrayUser: ${{ secrets.bintrayUser }} + bintrayKey: ${{ secrets.bintrayKey }} + buildNumber: ${{ github.run_number }} + - name: Aggregate test reports with ciMate + if: always() + continue-on-error: true + env: + CIMATE_PROJECT_ID: m84qx17y + run: | + wget -q https://get.cimate.io/release/linux/cimate + chmod +x cimate + ./cimate "**/TEST-*.xml" \ No newline at end of file diff --git a/.github/workflows/gradle-pr.yml b/.github/workflows/gradle-pr.yml new file mode 100644 index 000000000..994450faf --- /dev/null +++ b/.github/workflows/gradle-pr.yml @@ -0,0 +1,31 @@ +name: Pull Request Java CI + +on: [pull_request] + +jobs: + build: + + runs-on: ${{ matrix.os }} + + strategy: + matrix: + os: [ ubuntu-latest ] + jdk: [ 1.8, 11, 14 ] + fail-fast: false + + steps: + - uses: actions/checkout@v2 + - name: Set up JDK ${{ matrix.jdk }} + uses: actions/setup-java@v1 + with: + java-version: ${{ matrix.jdk }} + - name: Cache Gradle packages + uses: actions/cache@v1 + with: + path: ~/.gradle/caches + key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }} + restore-keys: ${{ runner.os }}-gradle + - name: Grant execute permission for gradlew + run: chmod +x gradlew + - name: Build with Gradle + run: ./gradlew clean build \ No newline at end of file diff --git a/.github/workflows/gradle-release.yml b/.github/workflows/gradle-release.yml new file mode 100644 index 000000000..08f2698dc --- /dev/null +++ b/.github/workflows/gradle-release.yml @@ -0,0 +1,44 @@ +name: Release Java CI + +on: + # Trigger the workflow on push + push: + # Sequence of patterns matched against refs/tags + tags: + - '*' # Push events to matching *, i.e. 1.0, 20.15.10 + +jobs: + publish: + + runs-on: ${{ matrix.os }} + + strategy: + matrix: + os: [ ubuntu-latest ] + fail-fast: false + + steps: + - uses: actions/checkout@v2 + - name: Set up JDK 1.8 + uses: actions/setup-java@v1 + with: + java-version: 1.8 + - name: Cache Gradle packages + uses: actions/cache@v1 + with: + path: ~/.gradle/caches + key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }} + restore-keys: ${{ runner.os }}-gradle + - name: Grant execute permission for gradlew + run: chmod +x gradlew + - name: Build with Gradle + run: ./gradlew clean build + - name: Publish Packages to Bintray + run: ./gradlew -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" -Pversion="${githubRef#refs/tags/}" -PbuildNumber="${buildNumber}" bintrayUpload + env: + bintrayUser: ${{ secrets.bintrayUser }} + bintrayKey: ${{ secrets.bintrayKey }} + sonatypeUsername: ${{ secrets.sonatypeUsername }} + sonatypePassword: ${{ secrets.sonatypePassword }} + githubRef: ${{ github.ref }} + buildNumber: ${{ github.run_number }} \ No newline at end of file diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 4722957c8..000000000 --- a/.travis.yml +++ /dev/null @@ -1,45 +0,0 @@ -# -# Copyright 2015-2018 the original author or authors. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ---- -language: java - -dist: trusty - -matrix: - include: - - jdk: openjdk8 - - jdk: openjdk11 - env: SKIP_RELEASE=true - - jdk: openjdk14 - env: SKIP_RELEASE=true - -env: - global: - - secure: "WBCy0hsF96Xybj4n0AUrGY2m5FWCUa30XR+aVElSOO8d7v7BMypAT8mAd+yC2Y+j8WUGpIv59CqgeK1JrYdR9b3qRKhJmoE1Q92TotrxXMTIC9OKuU51LaaOqGYqx4SqiA2AyaikTFPd8um7KZfUpW/dG4IXySsiJ2OKT1jMUq6TmbWHnAYtjbl3u3WdjBQTIZNMtqG1+H1vIpsWyZrvbB4TWlNzhKBAu/YnlzMtvStrDaF7XrCJ2BQdMomQO18NH2gWxUEvLbQb6ip3wFl9CRe6vID7K1dmFwm08RPt9hRPC9yDahlIy8VvuNcWrP42TV+BVYy8V/hfaIo1pPsDBrtmVyc7YZjXSUM68orDFOkRB35qGkNIaAhy5Yt6G9QfwLXJkDFofW5KMKtDFUzf+j4DwS0CiDMF4k6Qq7YN1tYFXE9R8xa6Gv+wTNHqs4RURbYMS9IlbkhKxNbtyuema2sIUbsIfDezIzLI5BnfH2uli7O6/z0/G0Vfmf6A4q5Olm+7uhzMTI0GKheUIKr16SOxABlrwJtLJftzoKz9hYd3b7C9t61vYzccC3rWYobplwIcK2w50gFHQS8HLeiCjo8yjCx+IRSvAGaZIBPQdHCktrEYCVDUTXOxdaD6k6Ef+ppm8Nn+M+iC8x/G1wYE4x1lDqHw3GfhKsEQmiHL/98=" - - secure: "mbB+rv9eWUFQ9/yr2REH2ztH6r/Uq7cq/OJ5WK6yFp0TmPzlJ8jbEVwe/sdAMW2E4qrfMu1c2h3qsVm41pNx0MwEsIW/lTIZRiRmNYon32n+SHlRWyTn8dJeY/p1HoHs450OjLgB4X4jmRmfSt8IQ/w9ZCjF6HVcgR4ctt+myECTNcRidEIOahljnSJmnFFDsKbt2UJN96AfvvhbxcarEKgKLXLd9tQT2GlvEOM+hVOY9hKD5FvIoRp9heyCEAsSBXe+MIWQlh4jx+B4zCajZJ+8KN6M8KIt40lV8z4Zbc11jgq/xULJwkQIuVZvkJ3huIfUrxwLPgYWeai/TR/m3+2jy1hFajt96pnhJzFEz0IBL0wFALwAY1n2R/6uugEUYnDsFcGQGTsO5OeeOixiRPH5HNgfOhInqJoFh/887f+gq7OLXjlRCTsw+S9KknZ3iBpHX/+khurfAUC9khiMvufEq6Wyu0TvxhmGERFrs7uugeJ1VA85SDVQ6Au9MV831PeBGqzHpYG7w2kJj1EiFjBRUhCthxyDfX2b04egozlKF8JEifZ9EVj7pNMQUvVG2c9Wj6M0fG84NusnlZlA16XxAmfLevc9b/BOSSrqc2r9Z1ZvxFnBPP9H94Uqt9ZninhW/T49jRF+lQzD45MTVogzVk77XtdpzUemf4t5mHc=" - - secure: "GcPu3U4o2Dp7QLCqaAo3mGMJTl9yd+w+elXqqt7WDjrjm5p8mrzvQfyiJA7mRJVDTGpgib8fLctL1X1+QOX4fNKElrDUFhE3bWAqwVwHGPK4D3HCb6THD5XVqE4qcPmdLWPkvJ9ZY5nSIfuRVASjZTcc4XSXISK2jUSGar0PNYlo62/OFGvNvMz/qINU9RU7iYdDlL19yd72TKDfuK0UOKhQEGypamEHam3SMNCw/p8Q5K1vQe+Oba3ILCvYHJvqWc2NLjRXJjXfIaOq/NpCK6Lx2U9etdpkb5lyW5Cx1lkzIcRUq8ZUCwbkHog9LJoZGrZFh5AzlZ6kRuejBqu7AISmZy4s9HVAb7AQmNxvXkK9EIt8lavcaHnLYUIfuxvBqK/ptcUN5P/KXCs1DsbpADjB7YbUu/EQ2OAWncV31Z+O4uMHV29eGTtaz9LoK28+mHRfFHqoazWyuUejor6iSSkrCeqsLEvU8o6rH4oenKz7hLlZsJqHGACYtYNYi2CXYlTu0bMX+Hb1EtTu6Awm9Gn04TqVdmNexgF5CdqW4A696i6jlkPpVCt4B4nq4VPs2RMTkjVl3B7uOkDm18u35dncuhgsnMfVmo9cWX5COeyefdh6kdnKsUf0+IPbV/hix/OCP72dpuhxgcyzN+DvaVLzX7YOx7TpJTzPSKNEQZc=" - - secure: "UFJEzDEv6H2Qscg9UgZFVJq5oFvq7nQkVoSuGfh5Y4ZhL9PCK5f3Ft9oYEZOQwXaxWD1qivtJjQV3DdBiqsHkrnPrJ0hi3iYVDJo26xLNtu3welFw5Veqmgu2NuwjaDn6cjRFCJRLzpszMUWO1DvfLJTs3LuJDuXEyAKDw9eQgfOakqO4xeloyXgM7xnoXz11rgqtJNU6snjVPHftXNPTHGsNDlTR7SAIbjYwLMbdIKM2qjzrXkg+a94QOz2stnTDz9V5iYNH+3XXCcYxD9nb1Ol1XGWvtDnNGEhtGmylLdjHXwGLHiW2HOXskLzSkm7ASie1WdyHVHZb4X8LjxCy62S0FPevBgat1a443Khx5HCMYR/8dQrlOI82GYTr8n9U6QQE4Li8XLw64DVP9HGs9jdbsfEdlIsiPWqB6ujlwiO6pyfmQGQCgjALA+oD87uDQLcgh+SDYgE0ZwmwGzbjeynZpoCrEE8A1GHhSwkM9khx6EJFacm9XzqoUGK0wB1f8su+51fqPglF1zye80IFA4wOMMAY+KUc9du/vQ98f0lfjsNSOC02CKYxbA5RaakQMAYjirsZraA57xLmCSIGMhhW4wClQdJBww6LLz463yZU4WPwyqU+ZW12aV5dVLb5RWXIbZKmdT74DfZajHvqgTYpb05L5cJl7ApMspUkKk=" - -script: ci/travis.sh - -before_cache: -- rm -f $HOME/.gradle/caches/modules-2/modules-2.lock -- rm -fr $HOME/.gradle/caches/*/plugin-resolution/ - -cache: - directories: - - $HOME/.gradle/caches/ - - $HOME/.gradle/wrapper/ diff --git a/ci/travis.sh b/ci/travis.sh deleted file mode 100755 index df3fc1245..000000000 --- a/ci/travis.sh +++ /dev/null @@ -1,44 +0,0 @@ -#!/usr/bin/env bash - -if [ "$TRAVIS_PULL_REQUEST" != "false" ]; then - - echo -e "Building PR #$TRAVIS_PULL_REQUEST [$TRAVIS_PULL_REQUEST_SLUG/$TRAVIS_PULL_REQUEST_BRANCH => $TRAVIS_REPO_SLUG/$TRAVIS_BRANCH]" - ./gradlew build - -elif [ "$TRAVIS_PULL_REQUEST" == "false" ] && [ "$TRAVIS_TAG" == "" ] && [ "$bintrayUser" != "" ] && [ "$TRAVIS_BRANCH" == "1.0.x" ] ; then - - echo -e "Building Develop Snapshot $TRAVIS_REPO_SLUG/$TRAVIS_BRANCH/$TRAVIS_BUILD_NUMBER" - ./gradlew \ - -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" \ - -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" \ - -PversionSuffix="-SNAPSHOT" \ - -PbuildNumber="$TRAVIS_BUILD_NUMBER" \ - build artifactoryPublish --stacktrace - -elif [ "$TRAVIS_PULL_REQUEST" == "false" ] && [ "$TRAVIS_TAG" == "" ] && [ "$bintrayUser" != "" ] ; then - - echo -e "Building Branch Snapshot $TRAVIS_REPO_SLUG/$TRAVIS_BRANCH/$TRAVIS_BUILD_NUMBER" - ./gradlew \ - -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" \ - -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" \ - -PversionSuffix="-${TRAVIS_BRANCH//\//-}-SNAPSHOT" \ - -PbuildNumber="$TRAVIS_BUILD_NUMBER" \ - build artifactoryPublish --stacktrace - -elif [ "$TRAVIS_PULL_REQUEST" == "false" ] && [ "$TRAVIS_TAG" != "" ] && [ "$bintrayUser" != "" ] ; then - - echo -e "Building Tag $TRAVIS_REPO_SLUG/$TRAVIS_TAG" - ./gradlew \ - -Pversion="$TRAVIS_TAG" \ - -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" \ - -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" \ - -PbuildNumber="$TRAVIS_BUILD_NUMBER" \ - build bintrayUpload --stacktrace - -else - - echo -e "Building $TRAVIS_REPO_SLUG/$TRAVIS_BRANCH" - ./gradlew build - -fi -