From cac6612d7e1f65f7aa148c8ea8846abef726d3ca Mon Sep 17 00:00:00 2001 From: Tim Brooks Date: Thu, 20 Dec 2018 17:43:12 -0700 Subject: [PATCH] Send clear session as routable remote request (#36805) This commit adds a RemoteClusterAwareRequest interface that allows a request to specify which remote node it should be routed to. The remote cluster aware client will attempt to route the request directly to this node. Otherwise it will send it as a proxy action to eventually end up on the requested node. It implements the ccr clean_session action with this client. --- .../transport/RemoteClusterAwareClient.java | 10 +- .../transport/RemoteClusterAwareRequest.java | 35 +++++++ .../ClearCcrRestoreSessionAction.java | 93 +++++-------------- .../ClearCcrRestoreSessionRequest.java | 67 +++++-------- .../PutCcrRestoreSessionAction.java | 19 ++-- .../xpack/ccr/repository/CcrRepository.java | 12 +-- 6 files changed, 104 insertions(+), 132 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareRequest.java diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java index 413036d70233e..7ec3706100c87 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareClient.java @@ -26,6 +26,7 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.client.Client; import org.elasticsearch.client.support.AbstractClient; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; @@ -47,14 +48,19 @@ final class RemoteClusterAwareClient extends AbstractClient { ActionRequestBuilder> void doExecute(Action action, Request request, ActionListener listener) { remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(res -> { - Transport.Connection connection = remoteClusterService.getConnection(clusterAlias); + Transport.Connection connection; + if (request instanceof RemoteClusterAwareRequest) { + DiscoveryNode preferredTargetNode = ((RemoteClusterAwareRequest) request).getPreferredTargetNode(); + connection = remoteClusterService.getConnection(preferredTargetNode, clusterAlias); + } else { + connection = remoteClusterService.getConnection(clusterAlias); + } service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY, new ActionListenerResponseHandler<>(listener, action.getResponseReader())); }, listener::onFailure)); } - @Override public void close() { // do nothing diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareRequest.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareRequest.java new file mode 100644 index 0000000000000..b708240f6daf9 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterAwareRequest.java @@ -0,0 +1,35 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport; + +import org.elasticsearch.cluster.node.DiscoveryNode; + +public interface RemoteClusterAwareRequest { + + /** + * Returns the preferred discovery node for this request. The remote cluster client will attempt to send + * this request directly to this node. Otherwise, it will send the request as a proxy action that will + * be routed by the remote cluster to this node. + * + * @return preferred discovery node + */ + DiscoveryNode getPreferredTargetNode(); + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java index 5ae4f38c3d0fa..40a7d67976318 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionAction.java @@ -7,26 +7,21 @@ package org.elasticsearch.xpack.ccr.action.repositories; import org.elasticsearch.action.Action; -import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.nodes.BaseNodeResponse; -import org.elasticsearch.action.support.nodes.BaseNodesResponse; -import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.client.ElasticsearchClient; -import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; import java.io.IOException; -import java.util.List; public class ClearCcrRestoreSessionAction extends Action { @@ -48,84 +43,40 @@ public ClearCcrRestoreSessionRequestBuilder newRequestBuilder(ElasticsearchClien return new ClearCcrRestoreSessionRequestBuilder(client); } - public static class TransportDeleteCcrRestoreSessionAction extends TransportNodesAction { + public static class TransportDeleteCcrRestoreSessionAction + extends HandledTransportAction { private final CcrRestoreSourceService ccrRestoreService; + private final ThreadPool threadPool; @Inject - public TransportDeleteCcrRestoreSessionAction(Settings settings, ThreadPool threadPool, ClusterService clusterService, - ActionFilters actionFilters, IndexNameExpressionResolver resolver, + public TransportDeleteCcrRestoreSessionAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters, + IndexNameExpressionResolver resolver, TransportService transportService, CcrRestoreSourceService ccrRestoreService) { - super(settings, NAME, threadPool, clusterService, transportService, actionFilters, resolver, - ClearCcrRestoreSessionRequest::new, ClearCcrRestoreSessionRequest.Request::new, ThreadPool.Names.GENERIC, Response.class); + super(settings, NAME, threadPool, transportService, actionFilters, resolver, ClearCcrRestoreSessionRequest::new); + TransportActionProxy.registerProxyAction(transportService, NAME, ClearCcrRestoreSessionResponse::new); this.ccrRestoreService = ccrRestoreService; + this.threadPool = transportService.getThreadPool(); } @Override - protected ClearCcrRestoreSessionResponse newResponse(ClearCcrRestoreSessionRequest request, List responses, - List failures) { - return new ClearCcrRestoreSessionResponse(clusterService.getClusterName(), responses, failures); - } - - @Override - protected ClearCcrRestoreSessionRequest.Request newNodeRequest(String nodeId, ClearCcrRestoreSessionRequest request) { - return request.getRequest(); - } - - @Override - protected Response newNodeResponse() { - return new Response(); - } - - @Override - protected Response nodeOperation(ClearCcrRestoreSessionRequest.Request request) { - ccrRestoreService.closeSession(request.getSessionUUID()); - return new Response(clusterService.localNode()); + protected void doExecute(ClearCcrRestoreSessionRequest request, ActionListener listener) { + // TODO: Currently blocking actions might occur in the session closed callbacks. This dispatch + // may be unnecessary when we remove these callbacks. + threadPool.generic().execute(() -> { + ccrRestoreService.closeSession(request.getSessionUUID()); + listener.onResponse(new ClearCcrRestoreSessionResponse()); + }); } } - public static class Response extends BaseNodeResponse { - - private Response() { - } - - private Response(StreamInput in) throws IOException { - readFrom(in); - } - - private Response(DiscoveryNode node) { - super(node); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - } - } - - public static class ClearCcrRestoreSessionResponse extends BaseNodesResponse { + public static class ClearCcrRestoreSessionResponse extends ActionResponse { ClearCcrRestoreSessionResponse() { } - ClearCcrRestoreSessionResponse(ClusterName clusterName, List chunkResponses, List failures) { - super(clusterName, chunkResponses, failures); - } - - @Override - protected List readNodesFrom(StreamInput in) throws IOException { - return in.readList(Response::new); - } - - @Override - protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { - out.writeList(nodes); + ClearCcrRestoreSessionResponse(StreamInput in) throws IOException { + super(in); } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java index 11605970736b0..96168bb697d74 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/ClearCcrRestoreSessionRequest.java @@ -6,68 +6,51 @@ package org.elasticsearch.xpack.ccr.action.repositories; -import org.elasticsearch.action.support.nodes.BaseNodeRequest; -import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.transport.RemoteClusterAwareRequest; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; -public class ClearCcrRestoreSessionRequest extends BaseNodesRequest { +public class ClearCcrRestoreSessionRequest extends ActionRequest implements RemoteClusterAwareRequest { - private Request request; + private DiscoveryNode node; + private String sessionUUID; ClearCcrRestoreSessionRequest() { } - public ClearCcrRestoreSessionRequest(String nodeId, Request request) { - super(nodeId); - this.request = request; + public ClearCcrRestoreSessionRequest(String sessionUUID, DiscoveryNode node) { + this.sessionUUID = sessionUUID; + this.node = node; } @Override - public void readFrom(StreamInput streamInput) throws IOException { - super.readFrom(streamInput); - request = new Request(); - request.readFrom(streamInput); + public ActionRequestValidationException validate() { + return null; } @Override - public void writeTo(StreamOutput streamOutput) throws IOException { - super.writeTo(streamOutput); - request.writeTo(streamOutput); + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + sessionUUID = in.readString(); } - public Request getRequest() { - return request; + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(sessionUUID); } - public static class Request extends BaseNodeRequest { - - private String sessionUUID; - - Request() { - } - - public Request(String nodeId, String sessionUUID) { - super(nodeId); - this.sessionUUID = sessionUUID; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - sessionUUID = in.readString(); - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - out.writeString(sessionUUID); - } + String getSessionUUID() { + return sessionUUID; + } - public String getSessionUUID() { - return sessionUUID; - } + @Override + public DiscoveryNode getPreferredTargetNode() { + return node; } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java index f79081f731fec..dd0d523cea9c1 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/PutCcrRestoreSessionAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; @@ -79,7 +80,7 @@ protected PutCcrRestoreSessionResponse shardOperation(PutCcrRestoreSessionReques throw new ShardNotFoundException(shardId); } ccrRestoreService.openSession(request.getSessionUUID(), indexShard); - return new PutCcrRestoreSessionResponse(indexShard.routingEntry().currentNodeId()); + return new PutCcrRestoreSessionResponse(clusterService.localNode()); } @Override @@ -102,34 +103,34 @@ protected ShardsIterator shards(ClusterState state, InternalRequest request) { public static class PutCcrRestoreSessionResponse extends ActionResponse { - private String nodeId; + private DiscoveryNode node; PutCcrRestoreSessionResponse() { } - PutCcrRestoreSessionResponse(String nodeId) { - this.nodeId = nodeId; + PutCcrRestoreSessionResponse(DiscoveryNode node) { + this.node = node; } PutCcrRestoreSessionResponse(StreamInput in) throws IOException { super(in); - nodeId = in.readString(); + node = new DiscoveryNode(in); } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); - nodeId = in.readString(); + node = new DiscoveryNode(in); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); - out.writeString(nodeId); + node.writeTo(out); } - public String getNodeId() { - return nodeId; + public DiscoveryNode getNode() { + return node; } } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 1dcf2d51aea73..533761e840205 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -253,9 +253,9 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v String sessionUUID = UUIDs.randomBase64UUID(); PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE, new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId, recoveryMetadata)).actionGet(); - String nodeId = response.getNodeId(); + DiscoveryNode node = response.getNode(); // TODO: Implement file restore - closeSession(remoteClient, nodeId, sessionUUID); + closeSession(remoteClient, node, sessionUUID); maybeUpdateMappings(client, remoteClient, leaderIndex, indexShard.indexSettings()); } @@ -280,13 +280,9 @@ private void maybeUpdateMappings(Client localClient, Client remoteClient, Index } } - private void closeSession(Client remoteClient, String nodeId, String sessionUUID) { - ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(nodeId, - new ClearCcrRestoreSessionRequest.Request(nodeId, sessionUUID)); + private void closeSession(Client remoteClient, DiscoveryNode node, String sessionUUID) { + ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node); ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response = remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet(); - if (response.hasFailures()) { - throw response.failures().get(0); - } } }