From c2d5ab142aa9b1ab7418db1268d5d3ee9cb4973e Mon Sep 17 00:00:00 2001 From: Oleh Dokuka Date: Fri, 4 Dec 2020 19:48:35 +0200 Subject: [PATCH] ensures that context of `actual` subscriber is available for LiftOperator Signed-off-by: Oleh Dokuka --- .../src/main/java/io/rsocket/core/RequestOperator.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java b/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java index f95a5f66c..38c392408 100644 --- a/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java +++ b/rsocket-core/src/main/java/io/rsocket/core/RequestOperator.java @@ -23,6 +23,7 @@ abstract class RequestOperator Fuseable.QueueSubscription, Fuseable { + final CorePublisher source; final String errorMessageOnSecondSubscription; CoreSubscriber actual; @@ -38,8 +39,8 @@ abstract class RequestOperator AtomicIntegerFieldUpdater.newUpdater(RequestOperator.class, "wip"); RequestOperator(CorePublisher source, String errorMessageOnSecondSubscription) { + this.source = source; this.errorMessageOnSecondSubscription = errorMessageOnSecondSubscription; - source.subscribe(this); WIP.lazySet(this, -1); } @@ -52,6 +53,7 @@ public void subscribe(Subscriber actual) { public void subscribe(CoreSubscriber actual) { if (this.wip == -1 && WIP.compareAndSet(this, -1, 0)) { this.actual = actual; + source.subscribe(this); actual.onSubscribe(this); } else { Operators.error(actual, new IllegalStateException(this.errorMessageOnSecondSubscription));