Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.support.AbstractClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -45,14 +46,19 @@ final class RemoteClusterAwareClient extends AbstractClient {
protected <Request extends ActionRequest, Response extends ActionResponse>
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
remoteClusterService.ensureConnected(clusterAlias, ActionListener.wrap(res -> {
Transport.Connection connection = remoteClusterService.getConnection(clusterAlias);
Transport.Connection connection;
if (request instanceof RemoteClusterAwareRequest) {
DiscoveryNode preferredTargetNode = ((RemoteClusterAwareRequest) request).getPreferredTargetNode();
connection = remoteClusterService.getConnection(preferredTargetNode, clusterAlias);
} else {
connection = remoteClusterService.getConnection(clusterAlias);
}
service.sendRequest(connection, action.name(), request, TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(listener, action.getResponseReader()));
},
listener::onFailure));
}


@Override
public void close() {
// do nothing
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.transport;

import org.elasticsearch.cluster.node.DiscoveryNode;

public interface RemoteClusterAwareRequest {

/**
* Returns the preferred discovery node for this request. The remote cluster client will attempt to send
* this request directly to this node. Otherwise, it will send the request as a proxy action that will
* be routed by the remote cluster to this node.
*
* @return preferred discovery node
*/
DiscoveryNode getPreferredTargetNode();

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,19 @@
package org.elasticsearch.xpack.ccr.action.repositories;

import org.elasticsearch.action.Action;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.nodes.TransportNodesAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;

import java.io.IOException;
import java.util.List;

public class ClearCcrRestoreSessionAction extends Action<ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse> {

public static final ClearCcrRestoreSessionAction INSTANCE = new ClearCcrRestoreSessionAction();
Expand All @@ -36,86 +31,47 @@ private ClearCcrRestoreSessionAction() {

@Override
public ClearCcrRestoreSessionResponse newResponse() {
return new ClearCcrRestoreSessionResponse();
throw new UnsupportedOperationException();
}

public static class TransportDeleteCcrRestoreSessionAction extends TransportNodesAction<ClearCcrRestoreSessionRequest,
ClearCcrRestoreSessionResponse, ClearCcrRestoreSessionRequest.Request, Response> {
@Override
public Writeable.Reader<ClearCcrRestoreSessionResponse> getResponseReader() {
return ClearCcrRestoreSessionResponse::new;
}

public static class TransportDeleteCcrRestoreSessionAction
extends HandledTransportAction<ClearCcrRestoreSessionRequest, ClearCcrRestoreSessionResponse> {

private final CcrRestoreSourceService ccrRestoreService;
private final ThreadPool threadPool;

@Inject
public TransportDeleteCcrRestoreSessionAction(ThreadPool threadPool, ClusterService clusterService, ActionFilters actionFilters,
TransportService transportService, CcrRestoreSourceService ccrRestoreService) {
super(NAME, threadPool, clusterService, transportService, actionFilters, ClearCcrRestoreSessionRequest::new,
ClearCcrRestoreSessionRequest.Request::new, ThreadPool.Names.GENERIC, Response.class);
public TransportDeleteCcrRestoreSessionAction(ActionFilters actionFilters, TransportService transportService,
CcrRestoreSourceService ccrRestoreService) {
super(NAME, transportService, actionFilters, ClearCcrRestoreSessionRequest::new);
TransportActionProxy.registerProxyAction(transportService, NAME, ClearCcrRestoreSessionResponse::new);
this.ccrRestoreService = ccrRestoreService;
this.threadPool = transportService.getThreadPool();
}

@Override
protected ClearCcrRestoreSessionResponse newResponse(ClearCcrRestoreSessionRequest request, List<Response> responses,
List<FailedNodeException> failures) {
return new ClearCcrRestoreSessionResponse(clusterService.getClusterName(), responses, failures);
}

@Override
protected ClearCcrRestoreSessionRequest.Request newNodeRequest(String nodeId, ClearCcrRestoreSessionRequest request) {
return request.getRequest();
}

@Override
protected Response newNodeResponse() {
return new Response();
}

@Override
protected Response nodeOperation(ClearCcrRestoreSessionRequest.Request request) {
ccrRestoreService.closeSession(request.getSessionUUID());
return new Response(clusterService.localNode());
protected void doExecute(Task task, ClearCcrRestoreSessionRequest request,
ActionListener<ClearCcrRestoreSessionResponse> listener) {
// TODO: Currently blocking actions might occur in the session closed callbacks. This dispatch
// may be unnecessary when we remove these callbacks.
threadPool.generic().execute(() -> {
ccrRestoreService.closeSession(request.getSessionUUID());
listener.onResponse(new ClearCcrRestoreSessionResponse());
});
}
}

public static class Response extends BaseNodeResponse {

private Response() {
}

private Response(StreamInput in) throws IOException {
readFrom(in);
}

private Response(DiscoveryNode node) {
super(node);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
}
}

public static class ClearCcrRestoreSessionResponse extends BaseNodesResponse<Response> {
public static class ClearCcrRestoreSessionResponse extends ActionResponse {

ClearCcrRestoreSessionResponse() {
}

ClearCcrRestoreSessionResponse(ClusterName clusterName, List<Response> chunkResponses, List<FailedNodeException> failures) {
super(clusterName, chunkResponses, failures);
}

@Override
protected List<Response> readNodesFrom(StreamInput in) throws IOException {
return in.readList(Response::new);
}

@Override
protected void writeNodesTo(StreamOutput out, List<Response> nodes) throws IOException {
out.writeList(nodes);
ClearCcrRestoreSessionResponse(StreamInput in) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,68 +6,52 @@

package org.elasticsearch.xpack.ccr.action.repositories;

import org.elasticsearch.action.support.nodes.BaseNodeRequest;
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.transport.RemoteClusterAwareRequest;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;

public class ClearCcrRestoreSessionRequest extends BaseNodesRequest<ClearCcrRestoreSessionRequest> {
public class ClearCcrRestoreSessionRequest extends ActionRequest implements RemoteClusterAwareRequest {

private Request request;
private DiscoveryNode node;
private String sessionUUID;

ClearCcrRestoreSessionRequest() {
ClearCcrRestoreSessionRequest(StreamInput in) throws IOException {
super.readFrom(in);
sessionUUID = in.readString();
}

public ClearCcrRestoreSessionRequest(String nodeId, Request request) {
super(nodeId);
this.request = request;
public ClearCcrRestoreSessionRequest(String sessionUUID, DiscoveryNode node) {
this.sessionUUID = sessionUUID;
this.node = node;
}

@Override
public void readFrom(StreamInput streamInput) throws IOException {
super.readFrom(streamInput);
request = new Request();
request.readFrom(streamInput);
public ActionRequestValidationException validate() {
return null;
}

@Override
public void writeTo(StreamOutput streamOutput) throws IOException {
super.writeTo(streamOutput);
request.writeTo(streamOutput);
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException();
}

public Request getRequest() {
return request;
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(sessionUUID);
}

public static class Request extends BaseNodeRequest {

private String sessionUUID;

Request() {
}

public Request(String nodeId, String sessionUUID) {
super(nodeId);
this.sessionUUID = sessionUUID;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
sessionUUID = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(sessionUUID);
}
String getSessionUUID() {
return sessionUUID;
}

public String getSessionUUID() {
return sessionUUID;
}
@Override
public DiscoveryNode getPreferredTargetNode() {
return node;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.action.support.single.shard.TransportSingleShardAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -70,7 +71,7 @@ protected PutCcrRestoreSessionResponse shardOperation(PutCcrRestoreSessionReques
throw new ShardNotFoundException(shardId);
}
ccrRestoreService.openSession(request.getSessionUUID(), indexShard);
return new PutCcrRestoreSessionResponse(indexShard.routingEntry().currentNodeId());
return new PutCcrRestoreSessionResponse(clusterService.localNode());
}

@Override
Expand All @@ -93,34 +94,34 @@ protected ShardsIterator shards(ClusterState state, InternalRequest request) {

public static class PutCcrRestoreSessionResponse extends ActionResponse {

private String nodeId;
private DiscoveryNode node;

PutCcrRestoreSessionResponse() {
}

PutCcrRestoreSessionResponse(String nodeId) {
this.nodeId = nodeId;
PutCcrRestoreSessionResponse(DiscoveryNode node) {
this.node = node;
}

PutCcrRestoreSessionResponse(StreamInput in) throws IOException {
super(in);
nodeId = in.readString();
node = new DiscoveryNode(in);
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodeId = in.readString();
node = new DiscoveryNode(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(nodeId);
node.writeTo(out);
}

public String getNodeId() {
return nodeId;
public DiscoveryNode getNode() {
return node;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,9 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v
String sessionUUID = UUIDs.randomBase64UUID();
PutCcrRestoreSessionAction.PutCcrRestoreSessionResponse response = remoteClient.execute(PutCcrRestoreSessionAction.INSTANCE,
new PutCcrRestoreSessionRequest(sessionUUID, leaderShardId, recoveryMetadata)).actionGet();
String nodeId = response.getNodeId();
DiscoveryNode node = response.getNode();
// TODO: Implement file restore
closeSession(remoteClient, nodeId, sessionUUID);
closeSession(remoteClient, node, sessionUUID);
maybeUpdateMappings(client, remoteClient, leaderIndex, indexShard.indexSettings());
}

Expand All @@ -278,13 +278,9 @@ private void maybeUpdateMappings(Client localClient, Client remoteClient, Index
}
}

private void closeSession(Client remoteClient, String nodeId, String sessionUUID) {
ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(nodeId,
new ClearCcrRestoreSessionRequest.Request(nodeId, sessionUUID));
private void closeSession(Client remoteClient, DiscoveryNode node, String sessionUUID) {
ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(sessionUUID, node);
ClearCcrRestoreSessionAction.ClearCcrRestoreSessionResponse response =
remoteClient.execute(ClearCcrRestoreSessionAction.INSTANCE, clearRequest).actionGet();
if (response.hasFailures()) {
throw response.failures().get(0);
}
}
}