diff --git a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/50_missing.yml b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/50_missing.yml index c40e7be1c643e..ac47fe000dd0d 100644 --- a/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/50_missing.yml +++ b/qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/50_missing.yml @@ -1,7 +1,7 @@ --- "Search with missing remote index pattern": - do: - catch: "request" + catch: "missing" search: index: "my_remote_cluster:foo" @@ -34,7 +34,7 @@ - match: { aggregations.cluster.buckets.0.doc_count: 6 } - do: - catch: "request" + catch: "missing" search: index: "my_remote_cluster:test_index,my_remote_cluster:foo" body: diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 7673a02b2d03d..a07de63d53734 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -215,7 +215,7 @@ public void collectSearchShards(IndicesOptions indicesOptions, String preference ActionListener> listener) { final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size()); final Map searchShardsResponses = new ConcurrentHashMap<>(); - final AtomicReference transportException = new AtomicReference<>(); + final AtomicReference transportException = new AtomicReference<>(); for (Map.Entry entry : remoteIndicesByCluster.entrySet()) { final String clusterName = entry.getKey(); RemoteClusterConnection remoteClusterConnection = remoteClusters.get(clusterName); @@ -232,7 +232,7 @@ public void collectSearchShards(IndicesOptions indicesOptions, String preference public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) { searchShardsResponses.put(clusterName, clusterSearchShardsResponse); if (responsesCountDown.countDown()) { - TransportException exception = transportException.get(); + RemoteTransportException exception = transportException.get(); if (exception == null) { listener.onResponse(searchShardsResponses); } else { @@ -243,8 +243,8 @@ public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) @Override public void onFailure(Exception e) { - TransportException exception = new TransportException("unable to communicate with remote cluster [" + - clusterName + "]", e); + RemoteTransportException exception = new RemoteTransportException("error while communicating with remote cluster [" + + clusterName + "]", e); if (transportException.compareAndSet(null, exception) == false) { exception = transportException.accumulateAndGet(exception, (previous, current) -> { current.addSuppressed(previous); diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java index 69096677664b3..0739ff5633bec 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterConnectionTests.java @@ -19,15 +19,9 @@ package org.elasticsearch.transport; import org.apache.lucene.store.AlreadyClosedException; -import org.elasticsearch.core.internal.io.IOUtils; -import org.elasticsearch.Build; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.LatchedActionListener; -import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; -import org.elasticsearch.action.admin.cluster.node.info.NodesInfoAction; -import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest; -import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsAction; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest; @@ -42,17 +36,16 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Strings; import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.http.HttpInfo; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.mocksocket.MockServerSocket; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; @@ -121,8 +114,12 @@ public static MockTransportService startTransport( try { newService.registerRequestHandler(ClusterSearchShardsAction.NAME,ThreadPool.Names.SAME, ClusterSearchShardsRequest::new, (request, channel) -> { - channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0], - knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap())); + if ("index_not_found".equals(request.preference())) { + channel.sendResponse(new IndexNotFoundException("index")); + } else { + channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0], + knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap())); + } }); newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new, (request, channel) -> { diff --git a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java index 5529f98af3342..03d76b5a953c6 100644 --- a/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java +++ b/server/src/test/java/org/elasticsearch/transport/RemoteClusterServiceTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.transport.MockTransportService; @@ -469,7 +470,6 @@ public void onFailure(Exception e) { assertEquals("no such remote cluster: [no such cluster]", ex.get().getMessage()); } { - logger.info("closing all source nodes"); // close all targets and check for the transport level failure path IOUtils.close(c1N1, c1N2, c2N1, c2N2); @@ -559,7 +559,20 @@ public void testCollectSearchShards() throws Exception { assertEquals(1, shardsResponse.getNodes().length); } } - + { + final CountDownLatch latch = new CountDownLatch(1); + AtomicReference> response = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + remoteClusterService.collectSearchShards(IndicesOptions.lenientExpandOpen(), "index_not_found", + null, remoteIndicesByCluster, + new LatchedActionListener<>(ActionListener.wrap(response::set, failure::set), latch)); + assertTrue(latch.await(1, TimeUnit.SECONDS)); + assertNull(response.get()); + assertNotNull(failure.get()); + assertThat(failure.get(), instanceOf(RemoteTransportException.class)); + RemoteTransportException remoteTransportException = (RemoteTransportException) failure.get(); + assertEquals(RestStatus.NOT_FOUND, remoteTransportException.status()); + } int numDisconnectedClusters = randomIntBetween(1, numClusters); Set disconnectedNodes = new HashSet<>(numDisconnectedClusters); Set disconnectedNodesIndices = new HashSet<>(numDisconnectedClusters); @@ -593,8 +606,9 @@ public void onNodeDisconnected(DiscoveryNode node) { assertTrue(latch.await(1, TimeUnit.SECONDS)); assertNull(response.get()); assertNotNull(failure.get()); - assertThat(failure.get(), instanceOf(TransportException.class)); - assertThat(failure.get().getMessage(), containsString("unable to communicate with remote cluster")); + assertThat(failure.get(), instanceOf(RemoteTransportException.class)); + assertThat(failure.get().getMessage(), containsString("error while communicating with remote cluster [")); + assertThat(failure.get().getCause(), instanceOf(NodeDisconnectedException.class)); } //setting skip_unavailable to true for all the disconnected clusters will make the request succeed again diff --git a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/50_missing.yml b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/50_missing.yml index 9c445f418daf2..0b224518782c3 100644 --- a/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/50_missing.yml +++ b/x-pack/qa/multi-cluster-search-security/src/test/resources/rest-api-spec/test/multi_cluster/50_missing.yml @@ -56,13 +56,13 @@ teardown: - match: { hits.total: 0 } - do: - catch: "request" + catch: "forbidden" headers: { Authorization: "Basic am9lOnMza3JpdA==" } search: index: "*:foo-bar" - do: - catch: "request" + catch: "forbidden" headers: { Authorization: "Basic am9lOnMza3JpdA==" } search: index: "my_remote_cluster:foo-bar"