Skip to content

Commit f76b225

Browse files
committed
Cross Cluster Search: preserve remote status code (#30976)
In case an error is returned when calling search_shards on a remote cluster, which will lead to throwing an exception in the coordinating node, we should make sure that the status code returned by the coordinating node is the same as the one returned by the remote cluster. Up until now a 500 - Internal Server Error was always returned. This commit changes this behaviour so that for instance if an index is not found, which causes an 404, a 404 is also returned by the coordinating node to the client. Closes #27461
1 parent 50ddd7d commit f76b225

File tree

5 files changed

+34
-15
lines changed

5 files changed

+34
-15
lines changed

qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/50_missing.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
---
22
"Search with missing remote index pattern":
33
- do:
4-
catch: "request"
4+
catch: "missing"
55
search:
66
index: "my_remote_cluster:foo"
77

@@ -34,7 +34,7 @@
3434
- match: { aggregations.cluster.buckets.0.doc_count: 6 }
3535

3636
- do:
37-
catch: "request"
37+
catch: "missing"
3838
search:
3939
index: "my_remote_cluster:test_index,my_remote_cluster:foo"
4040
body:

server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ public void collectSearchShards(IndicesOptions indicesOptions, String preference
216216
ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
217217
final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
218218
final Map<String, ClusterSearchShardsResponse> searchShardsResponses = new ConcurrentHashMap<>();
219-
final AtomicReference<TransportException> transportException = new AtomicReference<>();
219+
final AtomicReference<RemoteTransportException> transportException = new AtomicReference<>();
220220
for (Map.Entry<String, OriginalIndices> entry : remoteIndicesByCluster.entrySet()) {
221221
final String clusterName = entry.getKey();
222222
RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterName);
@@ -233,7 +233,7 @@ public void collectSearchShards(IndicesOptions indicesOptions, String preference
233233
public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
234234
searchShardsResponses.put(clusterName, clusterSearchShardsResponse);
235235
if (responsesCountDown.countDown()) {
236-
TransportException exception = transportException.get();
236+
RemoteTransportException exception = transportException.get();
237237
if (exception == null) {
238238
listener.onResponse(searchShardsResponses);
239239
} else {
@@ -244,8 +244,8 @@ public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse)
244244

245245
@Override
246246
public void onFailure(Exception e) {
247-
TransportException exception = new TransportException("unable to communicate with remote cluster [" +
248-
clusterName + "]", e);
247+
RemoteTransportException exception = new RemoteTransportException("error while communicating with remote cluster ["
248+
+ clusterName + "]", e);
249249
if (transportException.compareAndSet(null, exception) == false) {
250250
exception = transportException.accumulateAndGet(exception, (previous, current) -> {
251251
current.addSuppressed(previous);

server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.elasticsearch.transport;
2020

2121
import org.apache.lucene.store.AlreadyClosedException;
22-
import org.elasticsearch.core.internal.io.IOUtils;
2322
import org.elasticsearch.Build;
2423
import org.elasticsearch.Version;
2524
import org.elasticsearch.action.ActionListener;
@@ -51,7 +50,9 @@
5150
import org.elasticsearch.common.util.CancellableThreads;
5251
import org.elasticsearch.common.xcontent.XContentBuilder;
5352
import org.elasticsearch.common.xcontent.XContentFactory;
53+
import org.elasticsearch.core.internal.io.IOUtils;
5454
import org.elasticsearch.http.HttpInfo;
55+
import org.elasticsearch.index.IndexNotFoundException;
5556
import org.elasticsearch.mocksocket.MockServerSocket;
5657
import org.elasticsearch.test.ESTestCase;
5758
import org.elasticsearch.test.VersionUtils;
@@ -120,8 +121,12 @@ public static MockTransportService startTransport(
120121
try {
121122
newService.registerRequestHandler(ClusterSearchShardsAction.NAME, ClusterSearchShardsRequest::new, ThreadPool.Names.SAME,
122123
(request, channel) -> {
123-
channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
124-
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
124+
if ("index_not_found".equals(request.preference())) {
125+
channel.sendResponse(new IndexNotFoundException("index"));
126+
} else {
127+
channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
128+
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
129+
}
125130
});
126131
newService.registerRequestHandler(ClusterStateAction.NAME, ClusterStateRequest::new, ThreadPool.Names.SAME,
127132
(request, channel) -> {

server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.common.settings.Settings;
3131
import org.elasticsearch.common.transport.TransportAddress;
3232
import org.elasticsearch.core.internal.io.IOUtils;
33+
import org.elasticsearch.rest.RestStatus;
3334
import org.elasticsearch.test.ESTestCase;
3435
import org.elasticsearch.test.VersionUtils;
3536
import org.elasticsearch.test.transport.MockTransportService;
@@ -469,7 +470,6 @@ public void onFailure(Exception e) {
469470
assertEquals("no such remote cluster: [no such cluster]", ex.get().getMessage());
470471
}
471472
{
472-
473473
logger.info("closing all source nodes");
474474
// close all targets and check for the transport level failure path
475475
IOUtils.close(c1N1, c1N2, c2N1, c2N2);
@@ -559,7 +559,20 @@ public void testCollectSearchShards() throws Exception {
559559
assertEquals(1, shardsResponse.getNodes().length);
560560
}
561561
}
562-
562+
{
563+
final CountDownLatch latch = new CountDownLatch(1);
564+
AtomicReference<Map<String, ClusterSearchShardsResponse>> response = new AtomicReference<>();
565+
AtomicReference<Exception> failure = new AtomicReference<>();
566+
remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), "index_not_found",
567+
null, remoteIndicesByCluster,
568+
new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch));
569+
assertTrue(latch.await(1, TimeUnit.SECONDS));
570+
assertNull(response.get());
571+
assertNotNull(failure.get());
572+
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
573+
RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get();
574+
assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status());
575+
}
563576
int numDisconnectedClusters = randomIntBetween(1, numClusters);
564577
Set<DiscoveryNode> disconnectedNodes = new HashSet<>(numDisconnectedClusters);
565578
Set<Integer> disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters);
@@ -593,8 +606,9 @@ public void onNodeDisconnected(DiscoveryNode node) {
593606
assertTrue(latch.await(1, TimeUnit.SECONDS));
594607
assertNull(response.get());
595608
assertNotNull(failure.get());
596-
assertThat(failure.get(), instanceOf(TransportException.class));
597-
assertThat(failure.get().getMessage(), containsString("unable to communicate with remote cluster"));
609+
assertThat(failure.get(), instanceOf(RemoteTransportException.class));
610+
assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster ["));
611+
assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class));
598612
}
599613

600614
//setting skip_unavailable to true for all the disconnected clusters will make the request succeed again

x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/50_missing.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,13 @@ teardown:
5656
- match: { hits.total: 0 }
5757

5858
- do:
59-
catch: "request"
59+
catch: "forbidden"
6060
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
6161
search:
6262
index: "*:foo-bar"
6363

6464
- do:
65-
catch: "request"
65+
catch: "forbidden"
6666
headers: { Authorization: "Basic am9lOnMza3JpdA==" }
6767
search:
6868
index: "my_remote_cluster:foo-bar"

0 commit comments

Comments
 (0)