Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public abstract class AcknowledgedRequest<Request extends MasterNodeRequest<Requ
protected AcknowledgedRequest() {
}

protected AcknowledgedRequest(StreamInput in) throws IOException {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is not really related to the unfollow api, but it allows this: https://github.com/elastic/elasticsearch/pull/34132/files#diff-bd430812001c493a52293cc54e56ea24R36 (immutable fields)

and I think it good that at some point we can not use Streamable anymore. I want to go over other ccr request classes in a follow up change.

super(in);
this.timeout = in.readTimeValue();
}

/**
* Allows to set the timeout
* @param timeout timeout as a string (e.g. 1s)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,10 @@ public Builder putCustom(String type, Map<String, String> customIndexMetaData) {
return this;
}

public Map<String, String> removeCustom(String type) {
return this.customMetaData.remove(type);
}

public Set<String> getInSyncAllocationIds(int shardId) {
return inSyncAllocationIds.get(shardId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,13 @@
ccr.pause_follow:
index: bar
- is_true: acknowledged

- do:
indices.close:
index: bar
- is_true: acknowledged

- do:
ccr.unfollow:
index: bar
- is_true: acknowledged
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,11 @@
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction;
import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction;
import org.elasticsearch.xpack.ccr.action.TransportAutoFollowStatsAction;
import org.elasticsearch.xpack.ccr.rest.RestAutoFollowStatsAction;
import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction;
import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction;
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction;
Expand Down Expand Up @@ -72,6 +74,7 @@
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;

import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -164,6 +167,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
new ActionHandler<>(PutFollowAction.INSTANCE, TransportPutFollowAction.class),
new ActionHandler<>(ResumeFollowAction.INSTANCE, TransportResumeFollowAction.class),
new ActionHandler<>(PauseFollowAction.INSTANCE, TransportPauseFollowAction.class),
new ActionHandler<>(UnfollowAction.INSTANCE, TransportUnfollowAction.class),
// auto-follow actions
new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class),
new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class),
Expand All @@ -186,6 +190,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
new RestPutFollowAction(settings, restController),
new RestResumeFollowAction(settings, restController),
new RestPauseFollowAction(settings, restController),
new RestUnfollowAction(settings, restController),
// auto-follow APIs
new RestDeleteAutoFollowPatternAction(settings, restController),
new RestPutAutoFollowPatternAction(settings, restController),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrSettings;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;

public class TransportUnfollowAction extends TransportMasterNodeAction<UnfollowAction.Request, AcknowledgedResponse> {

@Inject
public TransportUnfollowAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, UnfollowAction.NAME, transportService, clusterService, threadPool, actionFilters,
UnfollowAction.Request::new, indexNameExpressionResolver);
}

@Override
protected String executor() {
return ThreadPool.Names.SAME;
}

@Override
protected AcknowledgedResponse newResponse() {
return new AcknowledgedResponse();
}

@Override
protected void masterOperation(UnfollowAction.Request request,
ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
clusterService.submitStateUpdateTask("unfollow_action", new ClusterStateUpdateTask() {

@Override
public ClusterState execute(ClusterState current) throws Exception {
String followerIndex = request.getFollowerIndex();
return unfollow(followerIndex, current);
}

@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new AcknowledgedResponse(true));
}
});
}

@Override
protected ClusterBlockException checkBlock(UnfollowAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

static ClusterState unfollow(String followerIndex, ClusterState current) {
IndexMetaData followerIMD = current.metaData().index(followerIndex);

PersistentTasksCustomMetaData persistentTasks = current.metaData().custom(PersistentTasksCustomMetaData.TYPE);
if (persistentTasks != null) {
for (PersistentTasksCustomMetaData.PersistentTask<?> persistentTask : persistentTasks.tasks()) {
if (persistentTask.getTaskName().equals(ShardFollowTask.NAME)) {
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
if (shardFollowTask.getFollowShardId().getIndexName().equals(followerIndex)) {
throw new IllegalArgumentException("cannot convert the follower index [" + followerIndex +
"] to a non-follower, because it has not been paused");
}
}
}
}

if (followerIMD.getState() != IndexMetaData.State.CLOSE) {
throw new IllegalArgumentException("cannot convert the follower index [" + followerIndex +
"] to a non-follower, because it has not been closed");
}

IndexMetaData.Builder newIMD = IndexMetaData.builder(followerIMD);
// Remove index.xpack.ccr.following_index setting
Settings.Builder builder = Settings.builder();
builder.put(followerIMD.getSettings());
builder.remove(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey());

newIMD.settings(builder);
// Remove ccr custom metadata
newIMD.removeCustom(Ccr.CCR_CUSTOM_METADATA_KEY);

MetaData newMetaData = MetaData.builder(current.metaData())
.put(newIMD)
.build();
return ClusterState.builder(current)
.metaData(newMetaData)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr.rest;

import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;

import java.io.IOException;

import static org.elasticsearch.xpack.core.ccr.action.UnfollowAction.INSTANCE;

public class RestUnfollowAction extends BaseRestHandler {

public RestUnfollowAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(RestRequest.Method.POST, "/{index}/_ccr/unfollow", this);
}

@Override
public String getName() {
return "ccr_unfollow_action";
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
UnfollowAction.Request request = new UnfollowAction.Request(restRequest.param("index"));
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
Expand Down Expand Up @@ -60,6 +61,7 @@
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -655,6 +657,34 @@ public void testDeleteFollowerIndex() throws Exception {
ensureNoCcrTasks();
}

public void testUnfollowIndex() throws Exception {
String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON).get());
ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2");
PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest);
client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get();
client().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
assertBusy(() -> {
assertThat(client().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(1L));
});

// Indexing directly into index2 would fail now, because index2 is a follow index.
// We can't test this here because an assertion trips before an actual error is thrown and then index call hangs.

// Turn follow index into a regular index by: pausing shard follow, close index, unfollow index and then open index:
unfollowIndex("index2");
client().admin().indices().close(new CloseIndexRequest("index2")).actionGet();
assertAcked(client().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request("index2")).actionGet());
client().admin().indices().open(new OpenIndexRequest("index2")).actionGet();
ensureGreen("index2");

// Indexing succeeds now, because index2 is no longer a follow index:
client().prepareIndex("index2", "doc").setSource("{}", XContentType.JSON)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.get();
assertThat(client().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(2L));
}

private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, final Map<ShardId, Long> numDocsPerShard) {
return () -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrSettings;

import java.util.Collections;
import java.util.HashMap;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

public class TransportUnfollowActionTests extends ESTestCase {

public void testUnfollow() {
IndexMetaData.Builder followerIndex = IndexMetaData.builder("follow_index")
.settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true))
.numberOfShards(1)
.numberOfReplicas(0)
.state(IndexMetaData.State.CLOSE)
.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>());

ClusterState current = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(MetaData.builder()
.put(followerIndex)
.build())
.build();
ClusterState result = TransportUnfollowAction.unfollow("follow_index", current);

IndexMetaData resultIMD = result.metaData().index("follow_index");
assertThat(resultIMD.getSettings().get(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey()), nullValue());
assertThat(resultIMD.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY), nullValue());
}

public void testUnfollowIndexOpen() {
IndexMetaData.Builder followerIndex = IndexMetaData.builder("follow_index")
.settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true))
.numberOfShards(1)
.numberOfReplicas(0)
.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>());

ClusterState current = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(MetaData.builder()
.put(followerIndex)
.build())
.build();
Exception e = expectThrows(IllegalArgumentException.class, () -> TransportUnfollowAction.unfollow("follow_index", current));
assertThat(e.getMessage(),
equalTo("cannot convert the follower index [follow_index] to a non-follower, because it has not been closed"));
}

public void testUnfollowRunningShardFollowTasks() {
IndexMetaData.Builder followerIndex = IndexMetaData.builder("follow_index")
.settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true))
.numberOfShards(1)
.numberOfReplicas(0)
.state(IndexMetaData.State.CLOSE)
.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>());


ShardFollowTask params = new ShardFollowTask(
null,
new ShardId("follow_index", "", 0),
new ShardId("leader_index", "", 0),
1024,
1,
TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES,
1,
10240,
TimeValue.timeValueMillis(10),
TimeValue.timeValueMillis(10),
"uuid",
Collections.emptyMap()
);
PersistentTasksCustomMetaData.PersistentTask<?> task =
new PersistentTasksCustomMetaData.PersistentTask<>("id", ShardFollowTask.NAME, params, 0, null);

ClusterState current = ClusterState.builder(new ClusterName("cluster_name"))
.metaData(MetaData.builder()
.put(followerIndex)
.putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(0, Collections.singletonMap("id", task)))
.build())
.build();
Exception e = expectThrows(IllegalArgumentException.class, () -> TransportUnfollowAction.unfollow("follow_index", current));
assertThat(e.getMessage(),
equalTo("cannot convert the follower index [follow_index] to a non-follower, because it has not been paused"));
}

}
Loading