Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package org.asynchttpclient.extras.rxjava2.maybe;

import io.netty.channel.Channel;
import io.netty.handler.codec.http.HttpHeaders;
import io.reactivex.MaybeEmitter;
import io.reactivex.exceptions.CompositeException;
Expand All @@ -21,10 +22,14 @@
import org.asynchttpclient.HttpResponseBodyPart;
import org.asynchttpclient.HttpResponseStatus;
import org.asynchttpclient.extras.rxjava2.DisposedException;
import org.asynchttpclient.netty.request.NettyRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.net.ssl.SSLSession;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -144,6 +149,76 @@ public final void onThrowable(Throwable t) {
emitOnError(error);
}

@Override
public void onHostnameResolutionAttempt(String name) {
executeUnlessEmitterDisposed(() -> delegate().onHostnameResolutionAttempt(name));
}

@Override
public void onHostnameResolutionSuccess(String name, List<InetSocketAddress> addresses) {
executeUnlessEmitterDisposed(() -> delegate().onHostnameResolutionSuccess(name, addresses));
}

@Override
public void onHostnameResolutionFailure(String name, Throwable cause) {
executeUnlessEmitterDisposed(() -> delegate().onHostnameResolutionFailure(name, cause));
}

@Override
public void onTcpConnectAttempt(InetSocketAddress remoteAddress) {
executeUnlessEmitterDisposed(() -> delegate().onTcpConnectAttempt(remoteAddress));
}

@Override
public void onTcpConnectSuccess(InetSocketAddress remoteAddress, Channel connection) {
executeUnlessEmitterDisposed(() -> delegate().onTcpConnectSuccess(remoteAddress, connection));
}

@Override
public void onTcpConnectFailure(InetSocketAddress remoteAddress, Throwable cause) {
executeUnlessEmitterDisposed(() -> delegate().onTcpConnectFailure(remoteAddress, cause));
}

@Override
public void onTlsHandshakeAttempt() {
executeUnlessEmitterDisposed(() -> delegate().onTlsHandshakeAttempt());
}

@Override
public void onTlsHandshakeSuccess(SSLSession sslSession) {
executeUnlessEmitterDisposed(() -> delegate().onTlsHandshakeSuccess(sslSession));
}

@Override
public void onTlsHandshakeFailure(Throwable cause) {
executeUnlessEmitterDisposed(() -> delegate().onTlsHandshakeFailure(cause));
}

@Override
public void onConnectionPoolAttempt() {
executeUnlessEmitterDisposed(() -> delegate().onConnectionPoolAttempt());
}

@Override
public void onConnectionPooled(Channel connection) {
executeUnlessEmitterDisposed(() -> delegate().onConnectionPooled(connection));
}

@Override
public void onConnectionOffer(Channel connection) {
executeUnlessEmitterDisposed(() -> delegate().onConnectionOffer(connection));
}

@Override
public void onRequestSend(NettyRequest request) {
executeUnlessEmitterDisposed(() -> delegate().onRequestSend(request));
}

@Override
public void onRetry() {
executeUnlessEmitterDisposed(() -> delegate().onRetry());
}

/**
* Called to indicate that request processing is to be aborted because the linked Rx stream has been disposed. If
* the {@link #delegate() delegate} didn't already receive a terminal event,
Expand Down Expand Up @@ -184,4 +259,12 @@ private void emitOnError(Throwable error) {
LOGGER.debug("Not propagating onError after disposal: {}", error.getMessage(), error);
}
}

private void executeUnlessEmitterDisposed(Runnable runnable) {
if (emitter.isDisposed()) {
disposed();
} else {
runnable.run();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package org.asynchttpclient.extras.rxjava2.maybe;

import io.netty.channel.Channel;
import io.netty.handler.codec.http.HttpHeaders;
import io.reactivex.MaybeEmitter;
import io.reactivex.exceptions.CompositeException;
Expand All @@ -26,7 +27,10 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import javax.net.ssl.SSLSession;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;

Expand All @@ -35,10 +39,6 @@
import static org.mockito.BDDMockito.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.only;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

public class AbstractMaybeAsyncHandlerBridgeTest {

Expand All @@ -57,6 +57,20 @@ public class AbstractMaybeAsyncHandlerBridgeTest {
@Mock
private HttpResponseBodyPart bodyPart;

private final String hostname = "service:8080";

@Mock
private InetSocketAddress remoteAddress;

@Mock
private Channel channel;

@Mock
private SSLSession sslSession;

@Mock
private Throwable error;

@Captor
private ArgumentCaptor<Throwable> throwable;

Expand All @@ -76,6 +90,20 @@ public T call() throws Exception {
};
}

private static Runnable named(String name, Runnable runnable) {
return new Runnable() {
@Override
public String toString() {
return name;
}

@Override
public void run() {
runnable.run();
}
};
}

@BeforeMethod
public void initializeTest() {
MockitoAnnotations.initMocks(this);
Expand Down Expand Up @@ -104,10 +132,68 @@ public void forwardsEvents() throws Exception {
underTest.onTrailingHeadersReceived(headers);
then(delegate).should().onTrailingHeadersReceived(headers);

/* when */
underTest.onHostnameResolutionAttempt(hostname);
then(delegate).should().onHostnameResolutionAttempt(hostname);

/* when */
List<InetSocketAddress> remoteAddresses = Collections.singletonList(remoteAddress);
underTest.onHostnameResolutionSuccess(hostname, remoteAddresses);
then(delegate).should().onHostnameResolutionSuccess(hostname, remoteAddresses);

/* when */
underTest.onHostnameResolutionFailure(hostname, error);
then(delegate).should().onHostnameResolutionFailure(hostname, error);

/* when */
underTest.onTcpConnectAttempt(remoteAddress);
then(delegate).should().onTcpConnectAttempt(remoteAddress);

/* when */
underTest.onTcpConnectSuccess(remoteAddress, channel);
then(delegate).should().onTcpConnectSuccess(remoteAddress, channel);

/* when */
underTest.onTcpConnectFailure(remoteAddress, error);
then(delegate).should().onTcpConnectFailure(remoteAddress, error);

/* when */
underTest.onTlsHandshakeAttempt();
then(delegate).should().onTlsHandshakeAttempt();

/* when */
underTest.onTlsHandshakeSuccess(sslSession);
then(delegate).should().onTlsHandshakeSuccess(sslSession);

/* when */
underTest.onTlsHandshakeFailure(error);
then(delegate).should().onTlsHandshakeFailure(error);

/* when */
underTest.onConnectionPoolAttempt();
then(delegate).should().onConnectionPoolAttempt();

/* when */
underTest.onConnectionPooled(channel);
then(delegate).should().onConnectionPooled(channel);

/* when */
underTest.onConnectionOffer(channel);
then(delegate).should().onConnectionOffer(channel);

/* when */
underTest.onRequestSend(null);
then(delegate).should().onRequestSend(null);

/* when */
underTest.onRetry();
then(delegate).should().onRetry();

/* when */
underTest.onCompleted();
then(delegate).should().onCompleted();
then(emitter).should().onSuccess(this);

/* then */
verifyNoMoreInteractions(delegate);
}
Expand Down Expand Up @@ -254,6 +340,42 @@ public void httpEventCallbacksCheckDisposal(Callable<AsyncHandler.State> httpEve
verifyNoMoreInteractions(delegate);
}

@DataProvider
public Object[][] variousEvents() {
return new Object[][]{
{named("onHostnameResolutionAttempt", () -> underTest.onHostnameResolutionAttempt("service:8080"))},
{named("onHostnameResolutionSuccess", () -> underTest.onHostnameResolutionSuccess("service:8080",
Collections.singletonList(remoteAddress)))},
{named("onHostnameResolutionFailure", () -> underTest.onHostnameResolutionFailure("service:8080", error))},
{named("onTcpConnectAttempt", () -> underTest.onTcpConnectAttempt(remoteAddress))},
{named("onTcpConnectSuccess", () -> underTest.onTcpConnectSuccess(remoteAddress, channel))},
{named("onTcpConnectFailure", () -> underTest.onTcpConnectFailure(remoteAddress, error))},
{named("onTlsHandshakeAttempt", () -> underTest.onTlsHandshakeAttempt())},
{named("onTlsHandshakeSuccess", () -> underTest.onTlsHandshakeSuccess(sslSession))},
{named("onTlsHandshakeFailure", () -> underTest.onTlsHandshakeFailure(error))},
{named("onConnectionPoolAttempt", () -> underTest.onConnectionPoolAttempt())},
{named("onConnectionPooled", () -> underTest.onConnectionPooled(channel))},
{named("onConnectionOffer", () -> underTest.onConnectionOffer(channel))},
{named("onRequestSend", () -> underTest.onRequestSend(null))},
{named("onRetry", () -> underTest.onRetry())},
};
}

@Test(dataProvider = "variousEvents")
public void variousEventCallbacksCheckDisposal(Runnable event) {
given(emitter.isDisposed()).willReturn(true);

/* when */
event.run();
/* then */
then(delegate).should(only()).onThrowable(isA(DisposedException.class));

/* when */
event.run();
/* then */
verifyNoMoreInteractions(delegate);
}

private final class UnderTest extends AbstractMaybeAsyncHandlerBridge<Object> {
UnderTest() {
super(AbstractMaybeAsyncHandlerBridgeTest.this.emitter);
Expand Down