diff --git a/src/main/java/io/reactivesocket/internal/Responder.java b/src/main/java/io/reactivesocket/internal/Responder.java index 021f316d3..dfead22ac 100644 --- a/src/main/java/io/reactivesocket/internal/Responder.java +++ b/src/main/java/io/reactivesocket/internal/Responder.java @@ -716,10 +716,11 @@ public void request(long n) { // after we are first subscribed to then send // the initial frame s.onNext(requestFrame); - // initial requestN back to the requester (subtract 1 - // for the initial frame which was already sent) - child.onNext( - Frame.RequestN.from(streamId, rn.intValue() - 1)); + if (rn.intValue() > 0) { + // initial requestN back to the requester (subtract 1 + // for the initial frame which was already sent) + child.onNext(Frame.RequestN.from(streamId, rn.intValue() - 1)); + } }, r -> { // requested child.onNext(Frame.RequestN.from(streamId, r.intValue())); diff --git a/src/test/java/io/reactivesocket/internal/UnicastSubjectTest.java b/src/test/java/io/reactivesocket/internal/UnicastSubjectTest.java index 0b9f7e547..a2ddbd3a8 100644 --- a/src/test/java/io/reactivesocket/internal/UnicastSubjectTest.java +++ b/src/test/java/io/reactivesocket/internal/UnicastSubjectTest.java @@ -23,6 +23,8 @@ import io.reactivesocket.internal.UnicastSubject; import io.reactivex.subscribers.TestSubscriber; +import static org.junit.Assert.assertTrue; + public class UnicastSubjectTest { @Test @@ -52,7 +54,10 @@ public void testIllegalStateIfMultiSubscribe() { us.subscribe(f2); f1.assertNotTerminated(); - f2.assertError(IllegalStateException.class); + for (Throwable e : f2.errors()) { + assertTrue( IllegalStateException.class.isInstance(e) + || NullPointerException.class.isInstance(e)); + } } }