From fe283cdbce1f19809fbe9cf408f797f6f45a4bb1 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 18 Dec 2018 15:39:36 -0700 Subject: [PATCH 1/7] WIP --- .../elasticsearch/transport/RemoteClient.java | 27 +++++++++++++++++++ .../transport/RemoteClusterAwareClient.java | 25 ++++++++++++++--- .../transport/RemoteClusterService.java | 3 +-- 3 files changed, 49 insertions(+), 6 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/transport/RemoteClient.java diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClient.java b/server/src/main/java/org/elasticsearch/transport/RemoteClient.java new file mode 100644 index 0000000000000..3f9caa11e4ebb --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClient.java @@ -0,0 +1,27 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.transport; + +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.node.DiscoveryNode; + +public interface RemoteClient extends Client { + + Client preferredNode(DiscoveryNode node); +} diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java index 8e72e6d5768f1..4052ad493ca8c 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java @@ -25,41 +25,58 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.support.AbstractClient; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; -final class RemoteClusterAwareClient extends AbstractClient { +final class RemoteClusterAwareClient extends AbstractClient implements RemoteClient { private final TransportService service; private final String clusterAlias; private final RemoteClusterService remoteClusterService; + private final DiscoveryNode discoveryNode; RemoteClusterAwareClient(Settings settings, ThreadPool threadPool, TransportService service, String clusterAlias) { + this(settings, threadPool, service, clusterAlias, null); + } + + private RemoteClusterAwareClient(Settings settings, ThreadPool threadPool, TransportService service, String clusterAlias, + DiscoveryNode discoveryNode) { super(settings, threadPool); this.service = service; this.clusterAlias = clusterAlias; this.remoteClusterService = service.getRemoteClusterService(); + this.discoveryNode = discoveryNode; } @Override protected void doExecute(Action action, Request request, ActionListener listener) { remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(res -> { - Transport.Connection connection = remoteClusterService.getConnection(clusterAlias); + Transport.Connection connection; + if (discoveryNode == null) { + connection = remoteClusterService.getConnection(clusterAlias); + } else { + connection = remoteClusterService.getConnection(discoveryNode, clusterAlias); + } service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, action.getResponseReader())); }, listener::onFailure)); } - @Override public void close() { // do nothing } @Override - public Client getRemoteClusterClient(String clusterAlias) { + public RemoteClient getRemoteClusterClient(String clusterAlias) { return remoteClusterService.getRemoteClusterClient(threadPool(), clusterAlias); } + + @Override + public Client preferredNode(DiscoveryNode node) { + return new RemoteClusterAwareClient(settings, threadPool(), service, clusterAlias, node); + } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index cb802f13fdb50..93b2dabfe9f0d 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -28,7 +28,6 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; @@ -492,7 +491,7 @@ public void onFailure(Exception e) { * * @throws IllegalArgumentException if the given clusterAlias doesn't exist */ - public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias) { + public RemoteClient getRemoteClusterClient(ThreadPool threadPool, String clusterAlias) { if (transportService.getRemoteClusterService().getRemoteClusterNames().contains(clusterAlias) == false) { throw new IllegalArgumentException("unknown cluster alias [" + clusterAlias + "]"); } From 7af304fb64001dcdda743b17437e6a900aaf33f8 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Tue, 18 Dec 2018 17:28:34 -0700 Subject: [PATCH 2/7] WIP --- .../java/org/elasticsearch/client/Client.java | 3 +- .../elasticsearch/client/FilterClient.java | 3 +- .../elasticsearch/client/node/NodeClient.java | 3 +- .../elasticsearch/transport/RemoteClient.java | 8 +- .../transport/RemoteClusterAwareClient.java | 3 +- .../ClearCcrRestoreSessionAction.java | 95 ++++--------------- .../ClearCcrRestoreSessionRequest.java | 64 ++++--------- .../PutCcrRestoreSessionAction.java | 19 ++-- .../xpack/ccr/repository/CcrRepository.java | 15 ++- .../action/AutoFollowCoordinatorTests.java | 14 +-- .../RemoteClusterLicenseCheckerTests.java | 9 +- 11 files changed, 83 insertions(+), 153 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/client/Client.java b/server/src/main/java/org/elasticsearch/client/Client.java index 07871709f5726..26719ac831f33 100644 --- a/server/src/main/java/org/elasticsearch/client/Client.java +++ b/server/src/main/java/org/elasticsearch/client/Client.java @@ -67,6 +67,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.transport.RemoteClient; import java.util.Map; @@ -456,7 +457,7 @@ public interface Client extends ElasticsearchClient, Releasable { * @throws IllegalArgumentException if the given clusterAlias doesn't exist * @throws UnsupportedOperationException if this functionality is not available on this client. */ - default Client getRemoteClusterClient(String clusterAlias) { + default RemoteClient getRemoteClusterClient(String clusterAlias) { throw new UnsupportedOperationException("this client doesn't support remote cluster connections"); } } diff --git a/server/src/main/java/org/elasticsearch/client/FilterClient.java b/server/src/main/java/org/elasticsearch/client/FilterClient.java index b4230710414be..0fbc83bf6ab6c 100644 --- a/server/src/main/java/org/elasticsearch/client/FilterClient.java +++ b/server/src/main/java/org/elasticsearch/client/FilterClient.java @@ -25,6 +25,7 @@ import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClient; /** * A {@link Client} that contains another {@link Client} which it @@ -73,7 +74,7 @@ protected Client in() { } @Override - public Client getRemoteClusterClient(String clusterAlias) { + public RemoteClient getRemoteClusterClient(String clusterAlias) { return in.getRemoteClusterClient(clusterAlias); } } diff --git a/server/src/main/java/org/elasticsearch/client/node/NodeClient.java b/server/src/main/java/org/elasticsearch/client/node/NodeClient.java index 0ad863c936741..7a602b19e60c6 100644 --- a/server/src/main/java/org/elasticsearch/client/node/NodeClient.java +++ b/server/src/main/java/org/elasticsearch/client/node/NodeClient.java @@ -31,6 +31,7 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskListener; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClient; import org.elasticsearch.transport.RemoteClusterService; import java.util.Map; @@ -119,7 +120,7 @@ > TransportAction transportAction(Action action) { } @Override - public Client getRemoteClusterClient(String clusterAlias) { + public RemoteClient getRemoteClusterClient(String clusterAlias) { return remoteClusterService.getRemoteClusterClient(threadPool(), clusterAlias); } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClient.java b/server/src/main/java/org/elasticsearch/transport/RemoteClient.java index 3f9caa11e4ebb..286b6e47015d8 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClient.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClient.java @@ -23,5 +23,11 @@ public interface RemoteClient extends Client { - Client preferredNode(DiscoveryNode node); + /** + * Returns a remote client instance that will send requests to the specified node. + * + * @param node to route request to + * @return remote client + */ + RemoteClient routedToNode(DiscoveryNode node); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java index 4052ad493ca8c..02179caa6c495 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.client.Client; import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; @@ -76,7 +75,7 @@ public RemoteClient getRemoteClusterClient(String clusterAlias) { } @Override - public Client preferredNode(DiscoveryNode node) { + public RemoteClient routedToNode(DiscoveryNode node) { return new RemoteClusterAwareClient(settings, threadPool(), service, clusterAlias, node); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java index 33b8b415d8362..686e029cfbc88 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java @@ -7,24 +7,17 @@ package org.elasticsearch.xpack.ccr.action.repositories; import org.elasticsearch.action.Action; -import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.nodes.BaseNodeResponse; -import org.elasticsearch.action.support.nodes.BaseNodesResponse; -import org.elasticsearch.action.support.nodes.TransportNodesAction; -import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; -import java.io.IOException; -import java.util.List; - public class ClearCcrRestoreSessionAction extends Action { public static final ClearCcrRestoreSessionAction INSTANCE = new ClearCcrRestoreSessionAction(); @@ -36,86 +29,40 @@ private ClearCcrRestoreSessionAction() { @Override public ClearCcrRestoreSessionResponse newResponse() { - return new ClearCcrRestoreSessionResponse(); + throw new UnsupportedOperationException(); } - public static class TransportDeleteCcrRestoreSessionAction extends TransportNodesAction { + @Override + public Writeable.Reader getResponseReader() { + return ClearCcrRestoreSessionResponse::new; + } + + public static class TransportDeleteCcrRestoreSessionAction + extends HandledTransportAction { private final CcrRestoreSourceService ccrRestoreService; @Inject - public TransportDeleteCcrRestoreSessionAction(ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters, - TransportService transportService, CcrRestoreSourceService ccrRestoreService) { - super(NAME, threadPool, clusterService, transportService, actionFilters, ClearCcrRestoreSessionRequest::new, - ClearCcrRestoreSessionRequest.Request::new, ThreadPool.Names.GENERIC, Response.class); + public TransportDeleteCcrRestoreSessionAction(ActionFilters actionFilters, TransportService transportService, + CcrRestoreSourceService ccrRestoreService) { + super(NAME, transportService, actionFilters, ClearCcrRestoreSessionRequest::new); this.ccrRestoreService = ccrRestoreService; } @Override - protected ClearCcrRestoreSessionResponse newResponse(ClearCcrRestoreSessionRequest request, List responses, - List failures) { - return new ClearCcrRestoreSessionResponse(clusterService.getClusterName(), responses, failures); - } - - @Override - protected ClearCcrRestoreSessionRequest.Request newNodeRequest(String nodeId, ClearCcrRestoreSessionRequest request) { - return request.getRequest(); - } - - @Override - protected Response newNodeResponse() { - return new Response(); - } - - @Override - protected Response nodeOperation(ClearCcrRestoreSessionRequest.Request request) { + protected void doExecute(Task task, ClearCcrRestoreSessionRequest request, + ActionListener listener) { ccrRestoreService.closeSession(request.getSessionUUID()); - return new Response(clusterService.localNode()); + listener.onResponse(new ClearCcrRestoreSessionResponse()); } } - public static class Response extends BaseNodeResponse { - - private Response() { - } - - private Response(StreamInput in) throws IOException { - readFrom(in); - } - - private Response(DiscoveryNode node) { - super(node); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - } - } - - public static class ClearCcrRestoreSessionResponse extends BaseNodesResponse { + public static class ClearCcrRestoreSessionResponse extends ActionResponse { ClearCcrRestoreSessionResponse() { } - ClearCcrRestoreSessionResponse(ClusterName clusterName, List chunkResponses, List failures) { - super(clusterName, chunkResponses, failures); - } - - @Override - protected List readNodesFrom(StreamInput in) throws IOException { - return in.readList(Response::new); - } - - @Override - protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeList(nodes); + ClearCcrRestoreSessionResponse(StreamInput in) { } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java index 11605970736b0..c9e51aceaf93d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java @@ -6,68 +6,44 @@ package org.elasticsearch.xpack.ccr.action.repositories; -import org.elasticsearch.action.support.nodes.BaseNodeRequest; -import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; -public class ClearCcrRestoreSessionRequest extends BaseNodesRequest { +public class ClearCcrRestoreSessionRequest extends ActionRequest { - private Request request; + private String sessionUUID; - ClearCcrRestoreSessionRequest() { + ClearCcrRestoreSessionRequest(StreamInput in) throws IOException { + super.readFrom(in); + sessionUUID = in.readString(); } - public ClearCcrRestoreSessionRequest(String nodeId, Request request) { - super(nodeId); - this.request = request; + public ClearCcrRestoreSessionRequest(String sessionUUID) { + this.sessionUUID = sessionUUID; } @Override - public void readFrom(StreamInput streamInput) throws IOException { - super.readFrom(streamInput); - request = new Request(); - request.readFrom(streamInput); + public ActionRequestValidationException validate() { + return null; } @Override - public void writeTo(StreamOutput streamOutput) throws IOException { - super.writeTo(streamOutput); - request.writeTo(streamOutput); + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + sessionUUID = in.readString(); } - public Request getRequest() { - return request; + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(sessionUUID); } - public static class Request extends BaseNodeRequest { - - private String sessionUUID; - - Request() { - } - - public Request(String nodeId, String sessionUUID) { - super(nodeId); - this.sessionUUID = sessionUUID; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - sessionUUID = in.readString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(sessionUUID); - } - - public String getSessionUUID() { - return sessionUUID; - } + public String getSessionUUID() { + return sessionUUID; } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java index 7f362aa3b766c..2a1b354f5d8ea 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.support.single.shard.TransportSingleShardAction; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -70,7 +71,7 @@ protected PutCcrRestoreSessionResponse shardOperation(PutCcrRestoreSessionReques throw new ShardNotFoundException(shardId); } ccrRestoreService.openSession(request.getSessionUUID(), indexShard); - return new PutCcrRestoreSessionResponse(indexShard.routingEntry().currentNodeId()); + return new PutCcrRestoreSessionResponse(clusterService.localNode()); } @Override @@ -93,34 +94,34 @@ protected ShardsIterator shards(ClusterState state, InternalRequest request) { public static class PutCcrRestoreSessionResponse extends ActionResponse { - private String nodeId; + private DiscoveryNode node; PutCcrRestoreSessionResponse() { } - PutCcrRestoreSessionResponse(String nodeId) { - this.nodeId = nodeId; + PutCcrRestoreSessionResponse(DiscoveryNode node) { + this.node = node; } PutCcrRestoreSessionResponse(StreamInput in) throws IOException { super(in); - nodeId = in.readString(); + node = new DiscoveryNode(in); } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - nodeId = in.readString(); + node = new DiscoveryNode(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeString(nodeId); + node.writeTo(out); } - public String getNodeId() { - return nodeId; + public DiscoveryNode getNode() { + return node; } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index aeaa7fc5eaf57..ead1f57541767 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -35,6 +35,7 @@ import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotShardFailure; import org.elasticsearch.snapshots.SnapshotState; +import org.elasticsearch.transport.RemoteClient; import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; @@ -254,13 +255,13 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); ShardId leaderShardId = new ShardId(shardId.getIndexName(), leaderUUID, shardId.getId()); - Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); + RemoteClient remoteClient = client.getRemoteClusterClient(remoteClusterAlias); String sessionUUID = UUIDs.randomBase64UUID(); PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId, recoveryMetadata)).actionGet(); - String nodeId = response.getNodeId(); + DiscoveryNode node = response.getNode(); // TODO: Implement file restore - closeSession(remoteClient, nodeId, sessionUUID); + closeSession(remoteClient.routedToNode(node), sessionUUID); } @Override @@ -268,13 +269,9 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } - private void closeSession(Client remoteClient, String nodeId, String sessionUUID) { - ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(nodeId, - new ClearCcrRestoreSessionRequest.Request(nodeId, sessionUUID)); + private void closeSession(RemoteClient remoteClient, String sessionUUID) { + ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID); ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response = remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(); - if (response.hasFailures()) { - throw response.failures().get(0); - } } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 7228acaacf1a9..8ab30b465f57b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -7,7 +7,6 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -23,6 +22,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.transport.RemoteClient; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; @@ -59,7 +59,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testAutoFollower() { - Client client = mock(Client.class); + RemoteClient client = mock(RemoteClient.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState remoteState = createRemoteClusterState("logs-20190101"); @@ -130,7 +130,7 @@ void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List pa } public void testAutoFollowerClusterStateApiFailure() { - Client client = mock(Client.class); + RemoteClient client = mock(RemoteClient.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), @@ -181,7 +181,7 @@ void updateAutoFollowMetadata(Function updateFunctio } public void testAutoFollowerUpdateClusterStateFailure() { - Client client = mock(Client.class); + RemoteClient client = mock(RemoteClient.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState remoteState = createRemoteClusterState("logs-20190101"); @@ -238,7 +238,7 @@ void updateAutoFollowMetadata(Function updateFunctio } public void testAutoFollowerCreateAndFollowApiCallFailure() { - Client client = mock(Client.class); + RemoteClient client = mock(RemoteClient.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState remoteState = createRemoteClusterState("logs-20190101"); @@ -672,7 +672,7 @@ public void testUpdateAutoFollowersNoAutoFollowMetadata() { } public void testWaitForMetadataVersion() { - Client client = mock(Client.class); + RemoteClient client = mock(RemoteClient.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), @@ -732,7 +732,7 @@ void updateAutoFollowMetadata(Function updateFunctio } public void testWaitForTimeOut() { - Client client = mock(Client.class); + RemoteClient client = mock(RemoteClient.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java index 58ca42c7f681e..2045d012fd373 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.RemoteClient; import org.elasticsearch.xpack.core.action.XPackInfoAction; import java.util.ArrayList; @@ -379,11 +380,11 @@ private ThreadPool createMockThreadPool() { return threadPool; } - private Client createMockClient(final ThreadPool threadPool) { + private RemoteClient createMockClient(final ThreadPool threadPool) { return createMockClient(threadPool, client -> when(client.getRemoteClusterClient(anyString())).thenReturn(client)); } - private Client createMockClientThatThrowsOnGetRemoteClusterClient(final ThreadPool threadPool, final String clusterAlias) { + private RemoteClient createMockClientThatThrowsOnGetRemoteClusterClient(final ThreadPool threadPool, final String clusterAlias) { return createMockClient( threadPool, client -> { @@ -392,8 +393,8 @@ private Client createMockClientThatThrowsOnGetRemoteClusterClient(final ThreadPo }); } - private Client createMockClient(final ThreadPool threadPool, final Consumer finish) { - final Client client = mock(Client.class); + private RemoteClient createMockClient(final ThreadPool threadPool, final Consumer finish) { + final RemoteClient client = mock(RemoteClient.class); when(client.threadPool()).thenReturn(threadPool); finish.accept(client); return client; From 1f4fe9e9eabfd54796bfd4c9089b8a2bc21eb735 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 19 Dec 2018 13:10:05 -0700 Subject: [PATCH 3/7] Changes from review --- .../java/org/elasticsearch/client/Client.java | 3 +-- .../elasticsearch/client/FilterClient.java | 3 +-- .../elasticsearch/client/node/NodeClient.java | 3 +-- .../transport/RemoteClusterAwareClient.java | 24 ++++++------------- ...nt.java => RemoteClusterAwareRequest.java} | 13 ++++------ .../transport/RemoteClusterService.java | 3 ++- .../ClearCcrRestoreSessionRequest.java | 15 +++++++++--- .../xpack/ccr/repository/CcrRepository.java | 9 ++++--- .../action/AutoFollowCoordinatorTests.java | 14 +++++------ .../RemoteClusterLicenseCheckerTests.java | 9 ++++--- 10 files changed, 43 insertions(+), 53 deletions(-) rename server/src/main/java/org/elasticsearch/transport/{RemoteClient.java => RemoteClusterAwareRequest.java} (73%) diff --git a/server/src/main/java/org/elasticsearch/client/Client.java b/server/src/main/java/org/elasticsearch/client/Client.java index 26719ac831f33..07871709f5726 100644 --- a/server/src/main/java/org/elasticsearch/client/Client.java +++ b/server/src/main/java/org/elasticsearch/client/Client.java @@ -67,7 +67,6 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.transport.RemoteClient; import java.util.Map; @@ -457,7 +456,7 @@ public interface Client extends ElasticsearchClient, Releasable { * @throws IllegalArgumentException if the given clusterAlias doesn't exist * @throws UnsupportedOperationException if this functionality is not available on this client. */ - default RemoteClient getRemoteClusterClient(String clusterAlias) { + default Client getRemoteClusterClient(String clusterAlias) { throw new UnsupportedOperationException("this client doesn't support remote cluster connections"); } } diff --git a/server/src/main/java/org/elasticsearch/client/FilterClient.java b/server/src/main/java/org/elasticsearch/client/FilterClient.java index 0fbc83bf6ab6c..b4230710414be 100644 --- a/server/src/main/java/org/elasticsearch/client/FilterClient.java +++ b/server/src/main/java/org/elasticsearch/client/FilterClient.java @@ -25,7 +25,6 @@ import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.RemoteClient; /** * A {@link Client} that contains another {@link Client} which it @@ -74,7 +73,7 @@ protected Client in() { } @Override - public RemoteClient getRemoteClusterClient(String clusterAlias) { + public Client getRemoteClusterClient(String clusterAlias) { return in.getRemoteClusterClient(clusterAlias); } } diff --git a/server/src/main/java/org/elasticsearch/client/node/NodeClient.java b/server/src/main/java/org/elasticsearch/client/node/NodeClient.java index 7a602b19e60c6..0ad863c936741 100644 --- a/server/src/main/java/org/elasticsearch/client/node/NodeClient.java +++ b/server/src/main/java/org/elasticsearch/client/node/NodeClient.java @@ -31,7 +31,6 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskListener; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.RemoteClient; import org.elasticsearch.transport.RemoteClusterService; import java.util.Map; @@ -120,7 +119,7 @@ > TransportAction transportAction(Action action) { } @Override - public RemoteClient getRemoteClusterClient(String clusterAlias) { + public Client getRemoteClusterClient(String clusterAlias) { return remoteClusterService.getRemoteClusterClient(threadPool(), clusterAlias); } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java index 02179caa6c495..2ca42ff85abdf 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java @@ -23,29 +23,23 @@ import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; -final class RemoteClusterAwareClient extends AbstractClient implements RemoteClient { +final class RemoteClusterAwareClient extends AbstractClient { private final TransportService service; private final String clusterAlias; private final RemoteClusterService remoteClusterService; - private final DiscoveryNode discoveryNode; RemoteClusterAwareClient(Settings settings, ThreadPool threadPool, TransportService service, String clusterAlias) { - this(settings, threadPool, service, clusterAlias, null); - } - - private RemoteClusterAwareClient(Settings settings, ThreadPool threadPool, TransportService service, String clusterAlias, - DiscoveryNode discoveryNode) { super(settings, threadPool); this.service = service; this.clusterAlias = clusterAlias; this.remoteClusterService = service.getRemoteClusterService(); - this.discoveryNode = discoveryNode; } @Override @@ -53,10 +47,11 @@ private RemoteClusterAwareClient(Settings settings, ThreadPool threadPool, Trans void doExecute(Action action, Request request, ActionListener listener) { remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(res -> { Transport.Connection connection; - if (discoveryNode == null) { - connection = remoteClusterService.getConnection(clusterAlias); + if (request instanceof RemoteClusterAwareRequest) { + DiscoveryNode preferredTargetNode = ((RemoteClusterAwareRequest) request).getPreferredTargetNode(); + connection = remoteClusterService.getConnection(preferredTargetNode, clusterAlias); } else { - connection = remoteClusterService.getConnection(discoveryNode, clusterAlias); + connection = remoteClusterService.getConnection(clusterAlias); } service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, action.getResponseReader())); @@ -70,12 +65,7 @@ public void close() { } @Override - public RemoteClient getRemoteClusterClient(String clusterAlias) { + public Client getRemoteClusterClient(String clusterAlias) { return remoteClusterService.getRemoteClusterClient(threadPool(), clusterAlias); } - - @Override - public RemoteClient routedToNode(DiscoveryNode node) { - return new RemoteClusterAwareClient(settings, threadPool(), service, clusterAlias, node); - } } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClient.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareRequest.java similarity index 73% rename from server/src/main/java/org/elasticsearch/transport/RemoteClient.java rename to server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareRequest.java index 286b6e47015d8..7dc504da413f6 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClient.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareRequest.java @@ -16,18 +16,13 @@ * specific language governing permissions and limitations * under the License. */ + package org.elasticsearch.transport; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.node.DiscoveryNode; -public interface RemoteClient extends Client { +public interface RemoteClusterAwareRequest { + + DiscoveryNode getPreferredTargetNode(); - /** - * Returns a remote client instance that will send requests to the specified node. - * - * @param node to route request to - * @return remote client - */ - RemoteClient routedToNode(DiscoveryNode node); } diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 93b2dabfe9f0d..cb802f13fdb50 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Strings; @@ -491,7 +492,7 @@ public void onFailure(Exception e) { * * @throws IllegalArgumentException if the given clusterAlias doesn't exist */ - public RemoteClient getRemoteClusterClient(ThreadPool threadPool, String clusterAlias) { + public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias) { if (transportService.getRemoteClusterService().getRemoteClusterNames().contains(clusterAlias) == false) { throw new IllegalArgumentException("unknown cluster alias [" + clusterAlias + "]"); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java index c9e51aceaf93d..6bf3d5c45afc5 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java @@ -8,13 +8,16 @@ import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.transport.RemoteClusterAwareRequest; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; -public class ClearCcrRestoreSessionRequest extends ActionRequest { +public class ClearCcrRestoreSessionRequest extends ActionRequest implements RemoteClusterAwareRequest { + private DiscoveryNode node; private String sessionUUID; ClearCcrRestoreSessionRequest(StreamInput in) throws IOException { @@ -22,8 +25,9 @@ public class ClearCcrRestoreSessionRequest extends ActionRequest { sessionUUID = in.readString(); } - public ClearCcrRestoreSessionRequest(String sessionUUID) { + public ClearCcrRestoreSessionRequest(String sessionUUID, DiscoveryNode node) { this.sessionUUID = sessionUUID; + this.node = node; } @Override @@ -43,7 +47,12 @@ public void writeTo(StreamOutput out) throws IOException { out.writeString(sessionUUID); } - public String getSessionUUID() { + String getSessionUUID() { return sessionUUID; } + + @Override + public DiscoveryNode getPreferredTargetNode() { + return node; + } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index ead1f57541767..ec2274554b0a7 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -35,7 +35,6 @@ import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotShardFailure; import org.elasticsearch.snapshots.SnapshotState; -import org.elasticsearch.transport.RemoteClient; import org.elasticsearch.xpack.ccr.Ccr; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; @@ -255,13 +254,13 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY); ShardId leaderShardId = new ShardId(shardId.getIndexName(), leaderUUID, shardId.getId()); - RemoteClient remoteClient = client.getRemoteClusterClient(remoteClusterAlias); + Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias); String sessionUUID = UUIDs.randomBase64UUID(); PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId, recoveryMetadata)).actionGet(); DiscoveryNode node = response.getNode(); // TODO: Implement file restore - closeSession(remoteClient.routedToNode(node), sessionUUID); + closeSession(remoteClient, node, sessionUUID); } @Override @@ -269,8 +268,8 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Ve throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } - private void closeSession(RemoteClient remoteClient, String sessionUUID) { - ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID); + private void closeSession(Client remoteClient, DiscoveryNode node, String sessionUUID) { + ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node); ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response = remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index 8ab30b465f57b..7228acaacf1a9 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -7,6 +7,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -22,7 +23,6 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.transport.RemoteClient; import org.elasticsearch.xpack.ccr.CcrLicenseChecker; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; @@ -59,7 +59,7 @@ public class AutoFollowCoordinatorTests extends ESTestCase { public void testAutoFollower() { - RemoteClient client = mock(RemoteClient.class); + Client client = mock(Client.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState remoteState = createRemoteClusterState("logs-20190101"); @@ -130,7 +130,7 @@ void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List pa } public void testAutoFollowerClusterStateApiFailure() { - RemoteClient client = mock(RemoteClient.class); + Client client = mock(Client.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), @@ -181,7 +181,7 @@ void updateAutoFollowMetadata(Function updateFunctio } public void testAutoFollowerUpdateClusterStateFailure() { - RemoteClient client = mock(RemoteClient.class); + Client client = mock(Client.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState remoteState = createRemoteClusterState("logs-20190101"); @@ -238,7 +238,7 @@ void updateAutoFollowMetadata(Function updateFunctio } public void testAutoFollowerCreateAndFollowApiCallFailure() { - RemoteClient client = mock(RemoteClient.class); + Client client = mock(Client.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState remoteState = createRemoteClusterState("logs-20190101"); @@ -672,7 +672,7 @@ public void testUpdateAutoFollowersNoAutoFollowMetadata() { } public void testWaitForMetadataVersion() { - RemoteClient client = mock(RemoteClient.class); + Client client = mock(Client.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), @@ -732,7 +732,7 @@ void updateAutoFollowMetadata(Function updateFunctio } public void testWaitForTimeOut() { - RemoteClient client = mock(RemoteClient.class); + Client client = mock(Client.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java index 2045d012fd373..58ca42c7f681e 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/RemoteClusterLicenseCheckerTests.java @@ -16,7 +16,6 @@ import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.transport.RemoteClient; import org.elasticsearch.xpack.core.action.XPackInfoAction; import java.util.ArrayList; @@ -380,11 +379,11 @@ private ThreadPool createMockThreadPool() { return threadPool; } - private RemoteClient createMockClient(final ThreadPool threadPool) { + private Client createMockClient(final ThreadPool threadPool) { return createMockClient(threadPool, client -> when(client.getRemoteClusterClient(anyString())).thenReturn(client)); } - private RemoteClient createMockClientThatThrowsOnGetRemoteClusterClient(final ThreadPool threadPool, final String clusterAlias) { + private Client createMockClientThatThrowsOnGetRemoteClusterClient(final ThreadPool threadPool, final String clusterAlias) { return createMockClient( threadPool, client -> { @@ -393,8 +392,8 @@ private RemoteClient createMockClientThatThrowsOnGetRemoteClusterClient(final Th }); } - private RemoteClient createMockClient(final ThreadPool threadPool, final Consumer finish) { - final RemoteClient client = mock(RemoteClient.class); + private Client createMockClient(final ThreadPool threadPool, final Consumer finish) { + final Client client = mock(Client.class); when(client.threadPool()).thenReturn(threadPool); finish.accept(client); return client; From 13b2f6eb5575389d7e4b48fb044243ef1085ffd7 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Wed, 19 Dec 2018 13:39:29 -0700 Subject: [PATCH 4/7] Add doc --- .../elasticsearch/transport/RemoteClusterAwareRequest.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareRequest.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareRequest.java index 7dc504da413f6..b708240f6daf9 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareRequest.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareRequest.java @@ -23,6 +23,13 @@ public interface RemoteClusterAwareRequest { + /** + * Returns the preferred discovery node for this request. The remote cluster client will attempt to send + * this request directly to this node. Otherwise, it will send the request as a proxy action that will + * be routed by the remote cluster to this node. + * + * @return preferred discovery node + */ DiscoveryNode getPreferredTargetNode(); } From c2ae3309b2027d4b63a25620793ad6faa85d9df2 Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 20 Dec 2018 15:17:38 -0700 Subject: [PATCH 5/7] Fixes --- .../repositories/ClearCcrRestoreSessionAction.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java index 686e029cfbc88..752ac5dd9767d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java @@ -9,12 +9,15 @@ import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.search.SearchTransportService; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; @@ -41,19 +44,26 @@ public static class TransportDeleteCcrRestoreSessionAction extends HandledTransportAction { private final CcrRestoreSourceService ccrRestoreService; + private final ThreadPool threadPool; @Inject public TransportDeleteCcrRestoreSessionAction(ActionFilters actionFilters, TransportService transportService, CcrRestoreSourceService ccrRestoreService) { super(NAME, transportService, actionFilters, ClearCcrRestoreSessionRequest::new); + TransportActionProxy.registerProxyAction(transportService, NAME, ClearCcrRestoreSessionResponse::new); this.ccrRestoreService = ccrRestoreService; + this.threadPool = transportService.getThreadPool(); } @Override protected void doExecute(Task task, ClearCcrRestoreSessionRequest request, ActionListener listener) { - ccrRestoreService.closeSession(request.getSessionUUID()); - listener.onResponse(new ClearCcrRestoreSessionResponse()); + // TODO: Currently blocking actions might occur in the session closed callbacks. This dispatch + // may be unnecessary when we remove these callbacks. + threadPool.generic().execute(() -> { + ccrRestoreService.closeSession(request.getSessionUUID()); + listener.onResponse(new ClearCcrRestoreSessionResponse()); + }); } } From 73556c50c9bd8edc8746521f004e7748e809b6cb Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 20 Dec 2018 15:18:56 -0700 Subject: [PATCH 6/7] Changes from review --- .../ccr/action/repositories/ClearCcrRestoreSessionRequest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java index 6bf3d5c45afc5..b9d277ca1b49a 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java @@ -37,8 +37,7 @@ public ActionRequestValidationException validate() { @Override public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - sessionUUID = in.readString(); + throw new UnsupportedOperationException(); } @Override From 15540701c12ab112d954a3954bb9c8ae1ae2397d Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 20 Dec 2018 15:49:14 -0700 Subject: [PATCH 7/7] Fix issue --- .../ccr/action/repositories/ClearCcrRestoreSessionAction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java index 752ac5dd9767d..81cde2984f500 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java @@ -9,7 +9,6 @@ import org.elasticsearch.action.Action; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.search.SearchTransportService; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.common.inject.Inject;