Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
/**
* Selects nodes that can receive requests. Used to keep requests away
* from master nodes or to send them to nodes with a particular attribute.
* Use with {@link RequestOptions.Builder#setNodeSelector(NodeSelector)}.
* Use with {@link RestClientBuilder#setNodeSelector(NodeSelector)}.
*/
public interface NodeSelector {
/**
Expand Down Expand Up @@ -68,7 +68,7 @@ public String toString() {
* have the {@code master} role OR it has the data {@code data}
* role.
*/
NodeSelector NOT_MASTER_ONLY = new NodeSelector() {
NodeSelector SKIP_DEDICATED_MASTERS = new NodeSelector() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@Override
public void select(Iterable<Node> nodes) {
for (Iterator<Node> itr = nodes.iterator(); itr.hasNext();) {
Expand All @@ -84,7 +84,7 @@ public void select(Iterable<Node> nodes) {

@Override
public String toString() {
return "NOT_MASTER_ONLY";
return "SKIP_DEDICATED_MASTERS";
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,18 @@
*/
public final class RequestOptions {
public static final RequestOptions DEFAULT = new Builder(
Collections.<Header>emptyList(), NodeSelector.ANY,
HeapBufferedResponseConsumerFactory.DEFAULT).build();
Collections.<Header>emptyList(), HeapBufferedResponseConsumerFactory.DEFAULT).build();

private final List<Header> headers;
private final NodeSelector nodeSelector;
private final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;

private RequestOptions(Builder builder) {
this.headers = Collections.unmodifiableList(new ArrayList<>(builder.headers));
this.nodeSelector = builder.nodeSelector;
this.httpAsyncResponseConsumerFactory = builder.httpAsyncResponseConsumerFactory;
}

public Builder toBuilder() {
Builder builder = new Builder(headers, nodeSelector, httpAsyncResponseConsumerFactory);
return builder;
return new Builder(headers, httpAsyncResponseConsumerFactory);
}

/**
Expand All @@ -62,14 +58,6 @@ public List<Header> getHeaders() {
return headers;
}

/**
* The selector that chooses which nodes are valid destinations for
* {@link Request}s with these options.
*/
public NodeSelector getNodeSelector() {
return nodeSelector;
}

/**
* The {@link HttpAsyncResponseConsumerFactory} used to create one
* {@link HttpAsyncResponseConsumer} callback per retry. Controls how the
Expand All @@ -93,9 +81,6 @@ public String toString() {
b.append(headers.get(h).toString());
}
}
if (nodeSelector != NodeSelector.ANY) {
b.append(", nodeSelector=").append(nodeSelector);
}
if (httpAsyncResponseConsumerFactory != HttpAsyncResponseConsumerFactory.DEFAULT) {
b.append(", consumerFactory=").append(httpAsyncResponseConsumerFactory);
}
Expand All @@ -113,24 +98,20 @@ public boolean equals(Object obj) {

RequestOptions other = (RequestOptions) obj;
return headers.equals(other.headers)
&& nodeSelector.equals(other.nodeSelector)
&& httpAsyncResponseConsumerFactory.equals(other.httpAsyncResponseConsumerFactory);
}

@Override
public int hashCode() {
return Objects.hash(headers, nodeSelector, httpAsyncResponseConsumerFactory);
return Objects.hash(headers, httpAsyncResponseConsumerFactory);
}

public static class Builder {
private final List<Header> headers;
private NodeSelector nodeSelector;
private HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;

private Builder(List<Header> headers, NodeSelector nodeSelector,
HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory) {
private Builder(List<Header> headers, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory) {
this.headers = new ArrayList<>(headers);
this.nodeSelector = nodeSelector;
this.httpAsyncResponseConsumerFactory = httpAsyncResponseConsumerFactory;
}

Expand All @@ -150,14 +131,6 @@ public void addHeader(String name, String value) {
this.headers.add(new ReqHeader(name, value));
}

/**
* Configure the selector that chooses which nodes are valid
* destinations for {@link Request}s with these options
*/
public void setNodeSelector(NodeSelector nodeSelector) {
this.nodeSelector = Objects.requireNonNull(nodeSelector, "nodeSelector cannot be null");
}

/**
* Set the {@link HttpAsyncResponseConsumerFactory} used to create one
* {@link HttpAsyncResponseConsumer} callback per retry. Controls how the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
import org.elasticsearch.client.DeadHostState.TimeSupplier;

import javax.net.ssl.SSLHandshakeException;
import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
Expand All @@ -74,7 +75,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLHandshakeException;

import static java.util.Collections.singletonList;

Expand Down Expand Up @@ -108,15 +108,17 @@ public class RestClient implements Closeable {
private final AtomicInteger lastNodeIndex = new AtomicInteger(0);
private final ConcurrentMap<HttpHost, DeadHostState> blacklist = new ConcurrentHashMap<>();
private final FailureListener failureListener;
private final NodeSelector nodeSelector;
private volatile NodeTuple<List<Node>> nodeTuple;

RestClient(CloseableHttpAsyncClient client, long maxRetryTimeoutMillis, Header[] defaultHeaders,
List<Node> nodes, String pathPrefix, FailureListener failureListener) {
List<Node> nodes, String pathPrefix, FailureListener failureListener, NodeSelector nodeSelector) {
this.client = client;
this.maxRetryTimeoutMillis = maxRetryTimeoutMillis;
this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders));
this.failureListener = failureListener;
this.pathPrefix = pathPrefix;
this.nodeSelector = nodeSelector;
setNodes(nodes);
}

Expand Down Expand Up @@ -146,7 +148,7 @@ public static RestClientBuilder builder(HttpHost... hosts) {
/**
* Replaces the hosts with which the client communicates.
*
* @deprecated prefer {@link setNodes} because it allows you
* @deprecated prefer {@link #setNodes(Collection)} because it allows you
* to set metadata for use with {@link NodeSelector}s
*/
@Deprecated
Expand Down Expand Up @@ -180,8 +182,8 @@ private static List<Node> hostsToNodes(HttpHost[] hosts) {
throw new IllegalArgumentException("hosts must not be null nor empty");
}
List<Node> nodes = new ArrayList<>(hosts.length);
for (int i = 0; i < hosts.length; i++) {
nodes.add(new Node(hosts[i]));
for (HttpHost host : hosts) {
nodes.add(new Node(host));
}
return nodes;
}
Expand Down Expand Up @@ -509,7 +511,7 @@ void performRequestAsyncNoCatch(Request request, ResponseListener listener) thro
setHeaders(httpRequest, request.getOptions().getHeaders());
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(listener);
long startTime = System.nanoTime();
performRequestAsync(startTime, nextNode(request.getOptions().getNodeSelector()), httpRequest, ignoreErrorCodes,
performRequestAsync(startTime, nextNode(), httpRequest, ignoreErrorCodes,
request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener);
}

Expand Down Expand Up @@ -611,7 +613,7 @@ private void setHeaders(HttpRequest httpRequest, Collection<Header> requestHeade
* that is closest to being revived.
* @throws IOException if no nodes are available
*/
private NodeTuple<Iterator<Node>> nextNode(NodeSelector nodeSelector) throws IOException {
private NodeTuple<Iterator<Node>> nextNode() throws IOException {
NodeTuple<List<Node>> nodeTuple = this.nodeTuple;
List<Node> hosts = selectHosts(nodeTuple, blacklist, lastNodeIndex, nodeSelector);
return new NodeTuple<>(hosts.iterator(), nodeTuple.authCache);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public final class RestClientBuilder {
private HttpClientConfigCallback httpClientConfigCallback;
private RequestConfigCallback requestConfigCallback;
private String pathPrefix;
private NodeSelector nodeSelector = NodeSelector.ANY;

/**
* Creates a new builder instance and sets the hosts that the client will send requests to.
Expand Down Expand Up @@ -173,6 +174,16 @@ public RestClientBuilder setPathPrefix(String pathPrefix) {
return this;
}

/**
* Sets the {@link NodeSelector} to be used for all requests.
* @throws NullPointerException if the provided nodeSelector is null
*/
public RestClientBuilder setNodeSelector(NodeSelector nodeSelector) {
Objects.requireNonNull(nodeSelector, "nodeSelector must not be null");
this.nodeSelector = nodeSelector;
return this;
}

/**
* Creates a new {@link RestClient} based on the provided configuration.
*/
Expand All @@ -186,7 +197,8 @@ public CloseableHttpAsyncClient run() {
return createHttpClient();
}
});
RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, nodes, pathPrefix, failureListener);
RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, nodes,
pathPrefix, failureListener, nodeSelector);
httpClient.start();
return restClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testNotMasterOnly() {
Collections.shuffle(nodes, getRandom());
List<Node> expected = new ArrayList<>(nodes);
expected.remove(masterOnly);
NodeSelector.NOT_MASTER_ONLY.select(nodes);
NodeSelector.SKIP_DEDICATED_MASTERS.select(nodes);
assertEquals(expected, nodes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,6 @@ static RequestOptions.Builder randomBuilder() {
}
}

if (randomBoolean()) {
builder.setNodeSelector(mock(NodeSelector.class));
}

if (randomBoolean()) {
builder.setHttpAsyncResponseConsumerFactory(new HeapBufferedResponseConsumerFactory(1));
}
Expand All @@ -131,15 +127,12 @@ private static RequestOptions copy(RequestOptions options) {

private static RequestOptions mutate(RequestOptions options) {
RequestOptions.Builder mutant = options.toBuilder();
int mutationType = between(0, 2);
int mutationType = between(0, 1);
switch (mutationType) {
case 0:
mutant.addHeader("extra", "m");
return mutant.build();
case 1:
mutant.setNodeSelector(mock(NodeSelector.class));
return mutant.build();
case 2:
mutant.setHttpAsyncResponseConsumerFactory(new HeapBufferedResponseConsumerFactory(5));
return mutant.build();
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,15 @@ public static void startHttpServer() throws Exception {
httpServers[i] = httpServer;
httpHosts[i] = new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort());
}
restClient = buildRestClient();
restClient = buildRestClient(NodeSelector.ANY);
}

private static RestClient buildRestClient() {
private static RestClient buildRestClient(NodeSelector nodeSelector) {
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
if (pathPrefix.length() > 0) {
restClientBuilder.setPathPrefix((randomBoolean() ? "/" : "") + pathPrefixWithoutLeadingSlash);
}
restClientBuilder.setNodeSelector(nodeSelector);
return restClientBuilder.build();
}

Expand Down Expand Up @@ -199,29 +200,28 @@ public void onFailure(Exception exception) {
* test what happens after calling
*/
public void testNodeSelector() throws IOException {
Request request = new Request("GET", "/200");
RequestOptions.Builder options = request.getOptions().toBuilder();
options.setNodeSelector(firstPositionNodeSelector());
request.setOptions(options);
int rounds = between(1, 10);
for (int i = 0; i < rounds; i++) {
/*
* Run the request more than once to verify that the
* NodeSelector overrides the round robin behavior.
*/
if (stoppedFirstHost) {
try {
restClient.performRequest(request);
fail("expected to fail to connect");
} catch (ConnectException e) {
// Windows isn't consistent here. Sometimes the message is even null!
if (false == System.getProperty("os.name").startsWith("Windows")) {
assertEquals("Connection refused", e.getMessage());
try (RestClient restClient = buildRestClient(firstPositionNodeSelector())) {
Request request = new Request("GET", "/200");
int rounds = between(1, 10);
for (int i = 0; i < rounds; i++) {
/*
* Run the request more than once to verify that the
* NodeSelector overrides the round robin behavior.
*/
if (stoppedFirstHost) {
try {
restClient.performRequest(request);
fail("expected to fail to connect");
} catch (ConnectException e) {
// Windows isn't consistent here. Sometimes the message is even null!
if (false == System.getProperty("os.name").startsWith("Windows")) {
assertEquals("Connection refused", e.getMessage());
}
}
} else {
Response response = restClient.performRequest(request);
assertEquals(httpHosts[0], response.getHost());
}
} else {
Response response = restClient.performRequest(request);
assertEquals(httpHosts[0], response.getHost());
}
}
}
Expand Down
Loading