Skip to content

Commit 80186ea

Browse files
margelatuslandelle
authored andcommitted
Update AbstractMaybeAsyncHandlerBridge to forward all events to the delegate AsyncHandler, fixes #1635 (#1636)
1 parent 8b65942 commit 80186ea

File tree

2 files changed

+209
-4
lines changed

2 files changed

+209
-4
lines changed

extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridge.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package org.asynchttpclient.extras.rxjava2.maybe;
1515

16+
import io.netty.channel.Channel;
1617
import io.netty.handler.codec.http.HttpHeaders;
1718
import io.reactivex.MaybeEmitter;
1819
import io.reactivex.exceptions.CompositeException;
@@ -21,10 +22,14 @@
2122
import org.asynchttpclient.HttpResponseBodyPart;
2223
import org.asynchttpclient.HttpResponseStatus;
2324
import org.asynchttpclient.extras.rxjava2.DisposedException;
25+
import org.asynchttpclient.netty.request.NettyRequest;
2426
import org.slf4j.Logger;
2527
import org.slf4j.LoggerFactory;
2628

29+
import javax.net.ssl.SSLSession;
30+
import java.net.InetSocketAddress;
2731
import java.util.Arrays;
32+
import java.util.List;
2833
import java.util.concurrent.atomic.AtomicBoolean;
2934

3035
import static java.util.Objects.requireNonNull;
@@ -144,6 +149,76 @@ public final void onThrowable(Throwable t) {
144149
emitOnError(error);
145150
}
146151

152+
@Override
153+
public void onHostnameResolutionAttempt(String name) {
154+
executeUnlessEmitterDisposed(() -> delegate().onHostnameResolutionAttempt(name));
155+
}
156+
157+
@Override
158+
public void onHostnameResolutionSuccess(String name, List<InetSocketAddress> addresses) {
159+
executeUnlessEmitterDisposed(() -> delegate().onHostnameResolutionSuccess(name, addresses));
160+
}
161+
162+
@Override
163+
public void onHostnameResolutionFailure(String name, Throwable cause) {
164+
executeUnlessEmitterDisposed(() -> delegate().onHostnameResolutionFailure(name, cause));
165+
}
166+
167+
@Override
168+
public void onTcpConnectAttempt(InetSocketAddress remoteAddress) {
169+
executeUnlessEmitterDisposed(() -> delegate().onTcpConnectAttempt(remoteAddress));
170+
}
171+
172+
@Override
173+
public void onTcpConnectSuccess(InetSocketAddress remoteAddress, Channel connection) {
174+
executeUnlessEmitterDisposed(() -> delegate().onTcpConnectSuccess(remoteAddress, connection));
175+
}
176+
177+
@Override
178+
public void onTcpConnectFailure(InetSocketAddress remoteAddress, Throwable cause) {
179+
executeUnlessEmitterDisposed(() -> delegate().onTcpConnectFailure(remoteAddress, cause));
180+
}
181+
182+
@Override
183+
public void onTlsHandshakeAttempt() {
184+
executeUnlessEmitterDisposed(() -> delegate().onTlsHandshakeAttempt());
185+
}
186+
187+
@Override
188+
public void onTlsHandshakeSuccess(SSLSession sslSession) {
189+
executeUnlessEmitterDisposed(() -> delegate().onTlsHandshakeSuccess(sslSession));
190+
}
191+
192+
@Override
193+
public void onTlsHandshakeFailure(Throwable cause) {
194+
executeUnlessEmitterDisposed(() -> delegate().onTlsHandshakeFailure(cause));
195+
}
196+
197+
@Override
198+
public void onConnectionPoolAttempt() {
199+
executeUnlessEmitterDisposed(() -> delegate().onConnectionPoolAttempt());
200+
}
201+
202+
@Override
203+
public void onConnectionPooled(Channel connection) {
204+
executeUnlessEmitterDisposed(() -> delegate().onConnectionPooled(connection));
205+
}
206+
207+
@Override
208+
public void onConnectionOffer(Channel connection) {
209+
executeUnlessEmitterDisposed(() -> delegate().onConnectionOffer(connection));
210+
}
211+
212+
@Override
213+
public void onRequestSend(NettyRequest request) {
214+
executeUnlessEmitterDisposed(() -> delegate().onRequestSend(request));
215+
}
216+
217+
@Override
218+
public void onRetry() {
219+
executeUnlessEmitterDisposed(() -> delegate().onRetry());
220+
}
221+
147222
/**
148223
* Called to indicate that request processing is to be aborted because the linked Rx stream has been disposed. If
149224
* the {@link #delegate() delegate} didn't already receive a terminal event,
@@ -184,4 +259,12 @@ private void emitOnError(Throwable error) {
184259
LOGGER.debug("Not propagating onError after disposal: {}", error.getMessage(), error);
185260
}
186261
}
262+
263+
private void executeUnlessEmitterDisposed(Runnable runnable) {
264+
if (emitter.isDisposed()) {
265+
disposed();
266+
} else {
267+
runnable.run();
268+
}
269+
}
187270
}

extras/rxjava2/src/test/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridgeTest.java

Lines changed: 126 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package org.asynchttpclient.extras.rxjava2.maybe;
1515

16+
import io.netty.channel.Channel;
1617
import io.netty.handler.codec.http.HttpHeaders;
1718
import io.reactivex.MaybeEmitter;
1819
import io.reactivex.exceptions.CompositeException;
@@ -26,7 +27,10 @@
2627
import org.testng.annotations.DataProvider;
2728
import org.testng.annotations.Test;
2829

30+
import javax.net.ssl.SSLSession;
31+
import java.net.InetSocketAddress;
2932
import java.util.Arrays;
33+
import java.util.Collections;
3034
import java.util.List;
3135
import java.util.concurrent.Callable;
3236

@@ -35,10 +39,6 @@
3539
import static org.mockito.BDDMockito.*;
3640
import static org.mockito.Matchers.any;
3741
import static org.mockito.Matchers.isA;
38-
import static org.mockito.Mockito.never;
39-
import static org.mockito.Mockito.only;
40-
import static org.mockito.Mockito.times;
41-
import static org.mockito.Mockito.verify;
4242

4343
public class AbstractMaybeAsyncHandlerBridgeTest {
4444

@@ -57,6 +57,20 @@ public class AbstractMaybeAsyncHandlerBridgeTest {
5757
@Mock
5858
private HttpResponseBodyPart bodyPart;
5959

60+
private final String hostname = "service:8080";
61+
62+
@Mock
63+
private InetSocketAddress remoteAddress;
64+
65+
@Mock
66+
private Channel channel;
67+
68+
@Mock
69+
private SSLSession sslSession;
70+
71+
@Mock
72+
private Throwable error;
73+
6074
@Captor
6175
private ArgumentCaptor<Throwable> throwable;
6276

@@ -76,6 +90,20 @@ public T call() throws Exception {
7690
};
7791
}
7892

93+
private static Runnable named(String name, Runnable runnable) {
94+
return new Runnable() {
95+
@Override
96+
public String toString() {
97+
return name;
98+
}
99+
100+
@Override
101+
public void run() {
102+
runnable.run();
103+
}
104+
};
105+
}
106+
79107
@BeforeMethod
80108
public void initializeTest() {
81109
MockitoAnnotations.initMocks(this);
@@ -104,10 +132,68 @@ public void forwardsEvents() throws Exception {
104132
underTest.onTrailingHeadersReceived(headers);
105133
then(delegate).should().onTrailingHeadersReceived(headers);
106134

135+
/* when */
136+
underTest.onHostnameResolutionAttempt(hostname);
137+
then(delegate).should().onHostnameResolutionAttempt(hostname);
138+
139+
/* when */
140+
List<InetSocketAddress> remoteAddresses = Collections.singletonList(remoteAddress);
141+
underTest.onHostnameResolutionSuccess(hostname, remoteAddresses);
142+
then(delegate).should().onHostnameResolutionSuccess(hostname, remoteAddresses);
143+
144+
/* when */
145+
underTest.onHostnameResolutionFailure(hostname, error);
146+
then(delegate).should().onHostnameResolutionFailure(hostname, error);
147+
148+
/* when */
149+
underTest.onTcpConnectAttempt(remoteAddress);
150+
then(delegate).should().onTcpConnectAttempt(remoteAddress);
151+
152+
/* when */
153+
underTest.onTcpConnectSuccess(remoteAddress, channel);
154+
then(delegate).should().onTcpConnectSuccess(remoteAddress, channel);
155+
156+
/* when */
157+
underTest.onTcpConnectFailure(remoteAddress, error);
158+
then(delegate).should().onTcpConnectFailure(remoteAddress, error);
159+
160+
/* when */
161+
underTest.onTlsHandshakeAttempt();
162+
then(delegate).should().onTlsHandshakeAttempt();
163+
164+
/* when */
165+
underTest.onTlsHandshakeSuccess(sslSession);
166+
then(delegate).should().onTlsHandshakeSuccess(sslSession);
167+
168+
/* when */
169+
underTest.onTlsHandshakeFailure(error);
170+
then(delegate).should().onTlsHandshakeFailure(error);
171+
172+
/* when */
173+
underTest.onConnectionPoolAttempt();
174+
then(delegate).should().onConnectionPoolAttempt();
175+
176+
/* when */
177+
underTest.onConnectionPooled(channel);
178+
then(delegate).should().onConnectionPooled(channel);
179+
180+
/* when */
181+
underTest.onConnectionOffer(channel);
182+
then(delegate).should().onConnectionOffer(channel);
183+
184+
/* when */
185+
underTest.onRequestSend(null);
186+
then(delegate).should().onRequestSend(null);
187+
188+
/* when */
189+
underTest.onRetry();
190+
then(delegate).should().onRetry();
191+
107192
/* when */
108193
underTest.onCompleted();
109194
then(delegate).should().onCompleted();
110195
then(emitter).should().onSuccess(this);
196+
111197
/* then */
112198
verifyNoMoreInteractions(delegate);
113199
}
@@ -254,6 +340,42 @@ public void httpEventCallbacksCheckDisposal(Callable<AsyncHandler.State> httpEve
254340
verifyNoMoreInteractions(delegate);
255341
}
256342

343+
@DataProvider
344+
public Object[][] variousEvents() {
345+
return new Object[][]{
346+
{named("onHostnameResolutionAttempt", () -> underTest.onHostnameResolutionAttempt("service:8080"))},
347+
{named("onHostnameResolutionSuccess", () -> underTest.onHostnameResolutionSuccess("service:8080",
348+
Collections.singletonList(remoteAddress)))},
349+
{named("onHostnameResolutionFailure", () -> underTest.onHostnameResolutionFailure("service:8080", error))},
350+
{named("onTcpConnectAttempt", () -> underTest.onTcpConnectAttempt(remoteAddress))},
351+
{named("onTcpConnectSuccess", () -> underTest.onTcpConnectSuccess(remoteAddress, channel))},
352+
{named("onTcpConnectFailure", () -> underTest.onTcpConnectFailure(remoteAddress, error))},
353+
{named("onTlsHandshakeAttempt", () -> underTest.onTlsHandshakeAttempt())},
354+
{named("onTlsHandshakeSuccess", () -> underTest.onTlsHandshakeSuccess(sslSession))},
355+
{named("onTlsHandshakeFailure", () -> underTest.onTlsHandshakeFailure(error))},
356+
{named("onConnectionPoolAttempt", () -> underTest.onConnectionPoolAttempt())},
357+
{named("onConnectionPooled", () -> underTest.onConnectionPooled(channel))},
358+
{named("onConnectionOffer", () -> underTest.onConnectionOffer(channel))},
359+
{named("onRequestSend", () -> underTest.onRequestSend(null))},
360+
{named("onRetry", () -> underTest.onRetry())},
361+
};
362+
}
363+
364+
@Test(dataProvider = "variousEvents")
365+
public void variousEventCallbacksCheckDisposal(Runnable event) {
366+
given(emitter.isDisposed()).willReturn(true);
367+
368+
/* when */
369+
event.run();
370+
/* then */
371+
then(delegate).should(only()).onThrowable(isA(DisposedException.class));
372+
373+
/* when */
374+
event.run();
375+
/* then */
376+
verifyNoMoreInteractions(delegate);
377+
}
378+
257379
private final class UnderTest extends AbstractMaybeAsyncHandlerBridge<Object> {
258380
UnderTest() {
259381
super(AbstractMaybeAsyncHandlerBridgeTest.this.emitter);

0 commit comments

Comments
 (0)