2727import org .reactivestreams .Subscription ;
2828import reactor .core .CoreSubscriber ;
2929import reactor .core .Fuseable ;
30- import reactor .core .Scannable ;
3130import reactor .core .publisher .Flux ;
3231import reactor .core .publisher .Mono ;
3332import reactor .core .publisher .Operators ;
34- import reactor .core .publisher .Sinks ;
3533
3634/** An implementation of {@link DuplexConnection} that connects inside the same JVM. */
3735final class LocalDuplexConnection implements DuplexConnection {
@@ -40,7 +38,7 @@ final class LocalDuplexConnection implements DuplexConnection {
4038 private final ByteBufAllocator allocator ;
4139 private final Flux <ByteBuf > in ;
4240
43- private final Sinks . Empty <Void > onClose ;
41+ private final Mono <Void > onClose ;
4442
4543 private final UnboundedProcessor out ;
4644
@@ -58,7 +56,7 @@ final class LocalDuplexConnection implements DuplexConnection {
5856 ByteBufAllocator allocator ,
5957 Flux <ByteBuf > in ,
6058 UnboundedProcessor out ,
61- Sinks . Empty <Void > onClose ) {
59+ Mono <Void > onClose ) {
6260 this .address = new LocalSocketAddress (name );
6361 this .allocator = Objects .requireNonNull (allocator , "allocator must not be null" );
6462 this .in = Objects .requireNonNull (in , "in must not be null" );
@@ -69,24 +67,23 @@ final class LocalDuplexConnection implements DuplexConnection {
6967 @ Override
7068 public void dispose () {
7169 out .onComplete ();
72- onClose .tryEmitEmpty ();
7370 }
7471
7572 @ Override
76- @ SuppressWarnings ("ConstantConditions" )
7773 public boolean isDisposed () {
78- return onClose . scan ( Scannable . Attr . TERMINATED ) || onClose . scan ( Scannable . Attr . CANCELLED );
74+ return out . isDisposed ( );
7975 }
8076
8177 @ Override
8278 public Mono <Void > onClose () {
83- return onClose . asMono () ;
79+ return onClose ;
8480 }
8581
8682 @ Override
8783 public Flux <ByteBuf > receive () {
8884 return in .transform (
89- Operators .<ByteBuf , ByteBuf >lift ((__ , actual ) -> new ByteBufReleaserOperator (actual )));
85+ Operators .<ByteBuf , ByteBuf >lift (
86+ (__ , actual ) -> new ByteBufReleaserOperator (actual , this )));
9087 }
9188
9289 @ Override
@@ -119,11 +116,14 @@ static class ByteBufReleaserOperator
119116 implements CoreSubscriber <ByteBuf >, Subscription , Fuseable .QueueSubscription <ByteBuf > {
120117
121118 final CoreSubscriber <? super ByteBuf > actual ;
119+ final LocalDuplexConnection parent ;
122120
123121 Subscription s ;
124122
125- public ByteBufReleaserOperator (CoreSubscriber <? super ByteBuf > actual ) {
123+ public ByteBufReleaserOperator (
124+ CoreSubscriber <? super ByteBuf > actual , LocalDuplexConnection parent ) {
126125 this .actual = actual ;
126+ this .parent = parent ;
127127 }
128128
129129 @ Override
@@ -136,17 +136,22 @@ public void onSubscribe(Subscription s) {
136136
137137 @ Override
138138 public void onNext (ByteBuf buf ) {
139- actual .onNext (buf );
140- buf .release ();
139+ try {
140+ actual .onNext (buf );
141+ } finally {
142+ buf .release ();
143+ }
141144 }
142145
143146 @ Override
144147 public void onError (Throwable t ) {
148+ parent .out .onError (t );
145149 actual .onError (t );
146150 }
147151
148152 @ Override
149153 public void onComplete () {
154+ parent .out .onComplete ();
150155 actual .onComplete ();
151156 }
152157
@@ -158,6 +163,7 @@ public void request(long n) {
158163 @ Override
159164 public void cancel () {
160165 s .cancel ();
166+ parent .out .onComplete ();
161167 }
162168
163169 @ Override
0 commit comments