Skip to content

Commit 16e4e7a

Browse files
authored
Node selector per client rather than per request (#31471)
We have made node selectors configurable per request, but all of other language clients don't allow for that. A good reason not to do so, is that having a different node selector per request breaks round-robin. This commit makes NodeSelector configurable only at client initialization. It also improves the docs on this matter, important given that a single node selector can still affect round-robin.
1 parent 59e7c64 commit 16e4e7a

File tree

20 files changed

+208
-148
lines changed

20 files changed

+208
-148
lines changed

client/rest/src/main/java/org/elasticsearch/client/NodeSelector.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
/**
2525
* Selects nodes that can receive requests. Used to keep requests away
2626
* from master nodes or to send them to nodes with a particular attribute.
27-
* Use with {@link RequestOptions.Builder#setNodeSelector(NodeSelector)}.
27+
* Use with {@link RestClientBuilder#setNodeSelector(NodeSelector)}.
2828
*/
2929
public interface NodeSelector {
3030
/**
@@ -68,7 +68,7 @@ public String toString() {
6868
* have the {@code master} role OR it has the data {@code data}
6969
* role.
7070
*/
71-
NodeSelector NOT_MASTER_ONLY = new NodeSelector() {
71+
NodeSelector SKIP_DEDICATED_MASTERS = new NodeSelector() {
7272
@Override
7373
public void select(Iterable<Node> nodes) {
7474
for (Iterator<Node> itr = nodes.iterator(); itr.hasNext();) {
@@ -84,7 +84,7 @@ public void select(Iterable<Node> nodes) {
8484

8585
@Override
8686
public String toString() {
87-
return "NOT_MASTER_ONLY";
87+
return "SKIP_DEDICATED_MASTERS";
8888
}
8989
};
9090
}

client/rest/src/main/java/org/elasticsearch/client/RequestOptions.java

Lines changed: 4 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,18 @@
3737
*/
3838
public final class RequestOptions {
3939
public static final RequestOptions DEFAULT = new Builder(
40-
Collections.<Header>emptyList(), NodeSelector.ANY,
41-
HeapBufferedResponseConsumerFactory.DEFAULT).build();
40+
Collections.<Header>emptyList(), HeapBufferedResponseConsumerFactory.DEFAULT).build();
4241

4342
private final List<Header> headers;
44-
private final NodeSelector nodeSelector;
4543
private final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;
4644

4745
private RequestOptions(Builder builder) {
4846
this.headers = Collections.unmodifiableList(new ArrayList<>(builder.headers));
49-
this.nodeSelector = builder.nodeSelector;
5047
this.httpAsyncResponseConsumerFactory = builder.httpAsyncResponseConsumerFactory;
5148
}
5249

5350
public Builder toBuilder() {
54-
Builder builder = new Builder(headers, nodeSelector, httpAsyncResponseConsumerFactory);
55-
return builder;
51+
return new Builder(headers, httpAsyncResponseConsumerFactory);
5652
}
5753

5854
/**
@@ -62,14 +58,6 @@ public List<Header> getHeaders() {
6258
return headers;
6359
}
6460

65-
/**
66-
* The selector that chooses which nodes are valid destinations for
67-
* {@link Request}s with these options.
68-
*/
69-
public NodeSelector getNodeSelector() {
70-
return nodeSelector;
71-
}
72-
7361
/**
7462
* The {@link HttpAsyncResponseConsumerFactory} used to create one
7563
* {@link HttpAsyncResponseConsumer} callback per retry. Controls how the
@@ -93,9 +81,6 @@ public String toString() {
9381
b.append(headers.get(h).toString());
9482
}
9583
}
96-
if (nodeSelector != NodeSelector.ANY) {
97-
b.append(", nodeSelector=").append(nodeSelector);
98-
}
9984
if (httpAsyncResponseConsumerFactory != HttpAsyncResponseConsumerFactory.DEFAULT) {
10085
b.append(", consumerFactory=").append(httpAsyncResponseConsumerFactory);
10186
}
@@ -113,24 +98,20 @@ public boolean equals(Object obj) {
11398

11499
RequestOptions other = (RequestOptions) obj;
115100
return headers.equals(other.headers)
116-
&& nodeSelector.equals(other.nodeSelector)
117101
&& httpAsyncResponseConsumerFactory.equals(other.httpAsyncResponseConsumerFactory);
118102
}
119103

120104
@Override
121105
public int hashCode() {
122-
return Objects.hash(headers, nodeSelector, httpAsyncResponseConsumerFactory);
106+
return Objects.hash(headers, httpAsyncResponseConsumerFactory);
123107
}
124108

125109
public static class Builder {
126110
private final List<Header> headers;
127-
private NodeSelector nodeSelector;
128111
private HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory;
129112

130-
private Builder(List<Header> headers, NodeSelector nodeSelector,
131-
HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory) {
113+
private Builder(List<Header> headers, HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory) {
132114
this.headers = new ArrayList<>(headers);
133-
this.nodeSelector = nodeSelector;
134115
this.httpAsyncResponseConsumerFactory = httpAsyncResponseConsumerFactory;
135116
}
136117

@@ -150,14 +131,6 @@ public void addHeader(String name, String value) {
150131
this.headers.add(new ReqHeader(name, value));
151132
}
152133

153-
/**
154-
* Configure the selector that chooses which nodes are valid
155-
* destinations for {@link Request}s with these options
156-
*/
157-
public void setNodeSelector(NodeSelector nodeSelector) {
158-
this.nodeSelector = Objects.requireNonNull(nodeSelector, "nodeSelector cannot be null");
159-
}
160-
161134
/**
162135
* Set the {@link HttpAsyncResponseConsumerFactory} used to create one
163136
* {@link HttpAsyncResponseConsumer} callback per retry. Controls how the

client/rest/src/main/java/org/elasticsearch/client/RestClient.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import org.apache.http.nio.protocol.HttpAsyncResponseConsumer;
4949
import org.elasticsearch.client.DeadHostState.TimeSupplier;
5050

51+
import javax.net.ssl.SSLHandshakeException;
5152
import java.io.Closeable;
5253
import java.io.IOException;
5354
import java.net.ConnectException;
@@ -74,7 +75,6 @@
7475
import java.util.concurrent.TimeUnit;
7576
import java.util.concurrent.atomic.AtomicInteger;
7677
import java.util.concurrent.atomic.AtomicReference;
77-
import javax.net.ssl.SSLHandshakeException;
7878

7979
import static java.util.Collections.singletonList;
8080

@@ -108,15 +108,17 @@ public class RestClient implements Closeable {
108108
private final AtomicInteger lastNodeIndex = new AtomicInteger(0);
109109
private final ConcurrentMap<HttpHost, DeadHostState> blacklist = new ConcurrentHashMap<>();
110110
private final FailureListener failureListener;
111+
private final NodeSelector nodeSelector;
111112
private volatile NodeTuple<List<Node>> nodeTuple;
112113

113114
RestClient(CloseableHttpAsyncClient client, long maxRetryTimeoutMillis, Header[] defaultHeaders,
114-
List<Node> nodes, String pathPrefix, FailureListener failureListener) {
115+
List<Node> nodes, String pathPrefix, FailureListener failureListener, NodeSelector nodeSelector) {
115116
this.client = client;
116117
this.maxRetryTimeoutMillis = maxRetryTimeoutMillis;
117118
this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders));
118119
this.failureListener = failureListener;
119120
this.pathPrefix = pathPrefix;
121+
this.nodeSelector = nodeSelector;
120122
setNodes(nodes);
121123
}
122124

@@ -146,7 +148,7 @@ public static RestClientBuilder builder(HttpHost... hosts) {
146148
/**
147149
* Replaces the hosts with which the client communicates.
148150
*
149-
* @deprecated prefer {@link setNodes} because it allows you
151+
* @deprecated prefer {@link #setNodes(Collection)} because it allows you
150152
* to set metadata for use with {@link NodeSelector}s
151153
*/
152154
@Deprecated
@@ -180,8 +182,8 @@ private static List<Node> hostsToNodes(HttpHost[] hosts) {
180182
throw new IllegalArgumentException("hosts must not be null nor empty");
181183
}
182184
List<Node> nodes = new ArrayList<>(hosts.length);
183-
for (int i = 0; i < hosts.length; i++) {
184-
nodes.add(new Node(hosts[i]));
185+
for (HttpHost host : hosts) {
186+
nodes.add(new Node(host));
185187
}
186188
return nodes;
187189
}
@@ -509,7 +511,7 @@ void performRequestAsyncNoCatch(Request request, ResponseListener listener) thro
509511
setHeaders(httpRequest, request.getOptions().getHeaders());
510512
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(listener);
511513
long startTime = System.nanoTime();
512-
performRequestAsync(startTime, nextNode(request.getOptions().getNodeSelector()), httpRequest, ignoreErrorCodes,
514+
performRequestAsync(startTime, nextNode(), httpRequest, ignoreErrorCodes,
513515
request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener);
514516
}
515517

@@ -611,7 +613,7 @@ private void setHeaders(HttpRequest httpRequest, Collection<Header> requestHeade
611613
* that is closest to being revived.
612614
* @throws IOException if no nodes are available
613615
*/
614-
private NodeTuple<Iterator<Node>> nextNode(NodeSelector nodeSelector) throws IOException {
616+
private NodeTuple<Iterator<Node>> nextNode() throws IOException {
615617
NodeTuple<List<Node>> nodeTuple = this.nodeTuple;
616618
List<Node> hosts = selectHosts(nodeTuple, blacklist, lastNodeIndex, nodeSelector);
617619
return new NodeTuple<>(hosts.iterator(), nodeTuple.authCache);

client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public final class RestClientBuilder {
5555
private HttpClientConfigCallback httpClientConfigCallback;
5656
private RequestConfigCallback requestConfigCallback;
5757
private String pathPrefix;
58+
private NodeSelector nodeSelector = NodeSelector.ANY;
5859

5960
/**
6061
* Creates a new builder instance and sets the hosts that the client will send requests to.
@@ -173,6 +174,16 @@ public RestClientBuilder setPathPrefix(String pathPrefix) {
173174
return this;
174175
}
175176

177+
/**
178+
* Sets the {@link NodeSelector} to be used for all requests.
179+
* @throws NullPointerException if the provided nodeSelector is null
180+
*/
181+
public RestClientBuilder setNodeSelector(NodeSelector nodeSelector) {
182+
Objects.requireNonNull(nodeSelector, "nodeSelector must not be null");
183+
this.nodeSelector = nodeSelector;
184+
return this;
185+
}
186+
176187
/**
177188
* Creates a new {@link RestClient} based on the provided configuration.
178189
*/
@@ -186,7 +197,8 @@ public CloseableHttpAsyncClient run() {
186197
return createHttpClient();
187198
}
188199
});
189-
RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, nodes, pathPrefix, failureListener);
200+
RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, nodes,
201+
pathPrefix, failureListener, nodeSelector);
190202
httpClient.start();
191203
return restClient;
192204
}

client/rest/src/test/java/org/elasticsearch/client/NodeSelectorTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public void testNotMasterOnly() {
5959
Collections.shuffle(nodes, getRandom());
6060
List<Node> expected = new ArrayList<>(nodes);
6161
expected.remove(masterOnly);
62-
NodeSelector.NOT_MASTER_ONLY.select(nodes);
62+
NodeSelector.SKIP_DEDICATED_MASTERS.select(nodes);
6363
assertEquals(expected, nodes);
6464
}
6565

client/rest/src/test/java/org/elasticsearch/client/RequestOptionsTests.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,6 @@ static RequestOptions.Builder randomBuilder() {
114114
}
115115
}
116116

117-
if (randomBoolean()) {
118-
builder.setNodeSelector(mock(NodeSelector.class));
119-
}
120-
121117
if (randomBoolean()) {
122118
builder.setHttpAsyncResponseConsumerFactory(new HeapBufferedResponseConsumerFactory(1));
123119
}
@@ -131,15 +127,12 @@ private static RequestOptions copy(RequestOptions options) {
131127

132128
private static RequestOptions mutate(RequestOptions options) {
133129
RequestOptions.Builder mutant = options.toBuilder();
134-
int mutationType = between(0, 2);
130+
int mutationType = between(0, 1);
135131
switch (mutationType) {
136132
case 0:
137133
mutant.addHeader("extra", "m");
138134
return mutant.build();
139135
case 1:
140-
mutant.setNodeSelector(mock(NodeSelector.class));
141-
return mutant.build();
142-
case 2:
143136
mutant.setHttpAsyncResponseConsumerFactory(new HeapBufferedResponseConsumerFactory(5));
144137
return mutant.build();
145138
default:

client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsIntegTests.java

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,15 @@ public static void startHttpServer() throws Exception {
7575
httpServers[i] = httpServer;
7676
httpHosts[i] = new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort());
7777
}
78-
restClient = buildRestClient();
78+
restClient = buildRestClient(NodeSelector.ANY);
7979
}
8080

81-
private static RestClient buildRestClient() {
81+
private static RestClient buildRestClient(NodeSelector nodeSelector) {
8282
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts);
8383
if (pathPrefix.length() > 0) {
8484
restClientBuilder.setPathPrefix((randomBoolean() ? "/" : "") + pathPrefixWithoutLeadingSlash);
8585
}
86+
restClientBuilder.setNodeSelector(nodeSelector);
8687
return restClientBuilder.build();
8788
}
8889

@@ -199,29 +200,28 @@ public void onFailure(Exception exception) {
199200
* test what happens after calling
200201
*/
201202
public void testNodeSelector() throws IOException {
202-
Request request = new Request("GET", "/200");
203-
RequestOptions.Builder options = request.getOptions().toBuilder();
204-
options.setNodeSelector(firstPositionNodeSelector());
205-
request.setOptions(options);
206-
int rounds = between(1, 10);
207-
for (int i = 0; i < rounds; i++) {
208-
/*
209-
* Run the request more than once to verify that the
210-
* NodeSelector overrides the round robin behavior.
211-
*/
212-
if (stoppedFirstHost) {
213-
try {
214-
restClient.performRequest(request);
215-
fail("expected to fail to connect");
216-
} catch (ConnectException e) {
217-
// Windows isn't consistent here. Sometimes the message is even null!
218-
if (false == System.getProperty("os.name").startsWith("Windows")) {
219-
assertEquals("Connection refused", e.getMessage());
203+
try (RestClient restClient = buildRestClient(firstPositionNodeSelector())) {
204+
Request request = new Request("GET", "/200");
205+
int rounds = between(1, 10);
206+
for (int i = 0; i < rounds; i++) {
207+
/*
208+
* Run the request more than once to verify that the
209+
* NodeSelector overrides the round robin behavior.
210+
*/
211+
if (stoppedFirstHost) {
212+
try {
213+
restClient.performRequest(request);
214+
fail("expected to fail to connect");
215+
} catch (ConnectException e) {
216+
// Windows isn't consistent here. Sometimes the message is even null!
217+
if (false == System.getProperty("os.name").startsWith("Windows")) {
218+
assertEquals("Connection refused", e.getMessage());
219+
}
220220
}
221+
} else {
222+
Response response = restClient.performRequest(request);
223+
assertEquals(httpHosts[0], response.getHost());
221224
}
222-
} else {
223-
Response response = restClient.performRequest(request);
224-
assertEquals(httpHosts[0], response.getHost());
225225
}
226226
}
227227
}

0 commit comments

Comments
 (0)