From d9caa3e716a80a9961b34e7c20b6b00bf90bd914 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 28 Sep 2018 08:53:31 +0200 Subject: [PATCH 1/2] [CCR] Add unfollow API The unfollow API changes a follower index into a regular index, so that it will accept write requests from clients. For the unfollow api to work the index follow needs to be stopped and the index needs to be closed. Closes #33931 --- .../support/master/AcknowledgedRequest.java | 5 + .../cluster/metadata/IndexMetaData.java | 4 + .../test/ccr/follow_and_unfollow.yml | 10 ++ .../java/org/elasticsearch/xpack/ccr/Ccr.java | 5 + .../ccr/action/TransportUnfollowAction.java | 116 ++++++++++++++++++ .../xpack/ccr/rest/RestUnfollowAction.java | 39 ++++++ .../xpack/ccr/ShardChangesIT.java | 31 +++++ .../action/TransportUnfollowActionTests.java | 101 +++++++++++++++ .../xpack/core/ccr/action/UnfollowAction.java | 67 ++++++++++ .../xpack/core/ccr/client/CcrClient.java | 11 ++ .../rest-api-spec/api/ccr.unfollow.json | 17 +++ 11 files changed, 406 insertions(+) create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestUnfollowAction.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/UnfollowAction.java create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.unfollow.json diff --git a/server/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java b/server/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java index e4f0353d0e20d..93a259a443679 100644 --- a/server/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java +++ b/server/src/main/java/org/elasticsearch/action/support/master/AcknowledgedRequest.java @@ -41,6 +41,11 @@ public abstract class AcknowledgedRequest customIndexMetaData) { return this; } + public Map removeCustom(String type) { + return this.customMetaData.remove(type); + } + public Set getInSyncAllocationIds(int shardId) { return inSyncAllocationIds.get(shardId); } diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml index 9aa072a09bcf8..ab60b2e49482a 100644 --- a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml @@ -40,3 +40,13 @@ ccr.pause_follow: index: bar - is_true: acknowledged + + - do: + indices.close: + index: bar + - is_true: acknowledged + + - do: + ccr.unfollow: + index: bar + - is_true: acknowledged diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index d2d769d4269b8..61db5954496e0 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -41,9 +41,11 @@ import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator; import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction; +import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction; import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportAutoFollowStatsAction; import org.elasticsearch.xpack.ccr.rest.RestAutoFollowStatsAction; +import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction; import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; @@ -72,6 +74,7 @@ import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; +import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; import java.util.Arrays; import java.util.Collection; @@ -164,6 +167,7 @@ public List> getPersistentTasksExecutor(ClusterServic new ActionHandler<>(PutFollowAction.INSTANCE, TransportPutFollowAction.class), new ActionHandler<>(ResumeFollowAction.INSTANCE, TransportResumeFollowAction.class), new ActionHandler<>(PauseFollowAction.INSTANCE, TransportPauseFollowAction.class), + new ActionHandler<>(UnfollowAction.INSTANCE, TransportUnfollowAction.class), // auto-follow actions new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class), new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class), @@ -186,6 +190,7 @@ public List getRestHandlers(Settings settings, RestController restC new RestPutFollowAction(settings, restController), new RestResumeFollowAction(settings, restController), new RestPauseFollowAction(settings, restController), + new RestUnfollowAction(settings, restController), // auto-follow APIs new RestDeleteAutoFollowPatternAction(settings, restController), new RestPutAutoFollowPatternAction(settings, restController), diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java new file mode 100644 index 0000000000000..76d972b9ac9a9 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java @@ -0,0 +1,116 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.Ccr; +import org.elasticsearch.xpack.ccr.CcrSettings; +import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; + +public class TransportUnfollowAction extends TransportMasterNodeAction { + + @Inject + public TransportUnfollowAction(Settings settings, TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(settings, UnfollowAction.NAME, transportService, clusterService, threadPool, actionFilters, + UnfollowAction.Request::new, indexNameExpressionResolver); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + @Override + protected void masterOperation(UnfollowAction.Request request, + ClusterState state, + ActionListener listener) throws Exception { + clusterService.submitStateUpdateTask("unfollow_action", new ClusterStateUpdateTask() { + + @Override + public ClusterState execute(ClusterState current) throws Exception { + String followerIndex = request.getFollowerIndex(); + return unfollow(followerIndex, current); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + listener.onResponse(new AcknowledgedResponse(true)); + } + }); + } + + @Override + protected ClusterBlockException checkBlock(UnfollowAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + static ClusterState unfollow(String followerIndex, ClusterState current) { + IndexMetaData followerIMD = current.metaData().index(followerIndex); + + PersistentTasksCustomMetaData persistentTasks = current.metaData().custom(PersistentTasksCustomMetaData.TYPE); + if (persistentTasks != null) { + for (PersistentTasksCustomMetaData.PersistentTask persistentTask : persistentTasks.tasks()) { + if (persistentTask.getTaskName().equals(ShardFollowTask.NAME)) { + ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams(); + if (shardFollowTask.getFollowShardId().getIndexName().equals(followerIndex)) { + throw new IllegalArgumentException("cannot unfollow index [" + followerIndex + + "], because it has not been paused"); + } + } + } + } + + if (followerIMD.getState() != IndexMetaData.State.CLOSE) { + throw new IllegalArgumentException("cannot unfollow index [" + followerIndex + "], because it has not been closed"); + } + + IndexMetaData.Builder newIMD = IndexMetaData.builder(followerIMD); + // Remove index.xpack.ccr.following_index setting + Settings.Builder builder = Settings.builder(); + builder.put(followerIMD.getSettings()); + builder.remove(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey()); + + newIMD.settings(builder); + // Remove ccr custom metadata + newIMD.removeCustom(Ccr.CCR_CUSTOM_METADATA_KEY); + + MetaData newMetaData = MetaData.builder(current.metaData()) + .put(newIMD) + .build(); + return ClusterState.builder(current) + .metaData(newMetaData) + .build(); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestUnfollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestUnfollowAction.java new file mode 100644 index 0000000000000..127d06eb751d5 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestUnfollowAction.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; + +import java.io.IOException; + +import static org.elasticsearch.xpack.core.ccr.action.UnfollowAction.INSTANCE; + +public class RestUnfollowAction extends BaseRestHandler { + + public RestUnfollowAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.POST, "/{index}/_ccr/unfollow", this); + } + + @Override + public String getName() { + return "ccr_unfollow_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + UnfollowAction.Request request = new UnfollowAction.Request(restRequest.param("index")); + return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel)); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index a3f4c54606b09..3fbe3b2d656f0 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest; import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; +import org.elasticsearch.action.admin.indices.open.OpenIndexRequest; import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.bulk.BulkProcessor; @@ -20,6 +21,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.analysis.common.CommonAnalysisPlugin; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -54,6 +56,7 @@ import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; +import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; import java.io.IOException; import java.util.Arrays; @@ -542,6 +545,34 @@ public void testAttemptToChangeCcrFollowingIndexSetting() throws Exception { "this setting is managed via a dedicated API")); } + public void testUnfollowIndex() throws Exception { + String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON).get()); + ResumeFollowAction.Request followRequest = createFollowRequest("index1", "index2"); + PutFollowAction.Request createAndFollowRequest = new PutFollowAction.Request(followRequest); + client().execute(PutFollowAction.INSTANCE, createAndFollowRequest).get(); + client().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get(); + assertBusy(() -> { + assertThat(client().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(1L)); + }); + + // Indexing directly into index2 would fail now, because index2 is a follow index. + // We can't test this here because an assertion trips before an actual error is thrown and then index call hangs. + + // Turn follow index into a regular index by: pausing shard follow, close index, unfollow index and then open index: + unfollowIndex("index2"); + client().admin().indices().close(new CloseIndexRequest("index2")).actionGet(); + assertAcked(client().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request("index2")).actionGet()); + client().admin().indices().open(new OpenIndexRequest("index2")).actionGet(); + ensureGreen("index2"); + + // Indexing succeeds now, because index2 is no longer a follow index: + client().prepareIndex("index2", "doc").setSource("{}", XContentType.JSON) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .get(); + assertThat(client().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(2L)); + } + private CheckedRunnable assertTask(final int numberOfPrimaryShards, final Map numDocsPerShard) { return () -> { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java new file mode 100644 index 0000000000000..b2624839d8728 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java @@ -0,0 +1,101 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ccr.Ccr; +import org.elasticsearch.xpack.ccr.CcrSettings; + +import java.util.Collections; +import java.util.HashMap; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +public class TransportUnfollowActionTests extends ESTestCase { + + public void testUnfollow() { + IndexMetaData.Builder followerIndex = IndexMetaData.builder("follow_index") + .settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)) + .numberOfShards(1) + .numberOfReplicas(0) + .state(IndexMetaData.State.CLOSE) + .putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>()); + + ClusterState current = ClusterState.builder(new ClusterName("cluster_name")) + .metaData(MetaData.builder() + .put(followerIndex) + .build()) + .build(); + ClusterState result = TransportUnfollowAction.unfollow("follow_index", current); + + IndexMetaData resultIMD = result.metaData().index("follow_index"); + assertThat(resultIMD.getSettings().get(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey()), nullValue()); + assertThat(resultIMD.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY), nullValue()); + } + + public void testUnfollowIndexOpen() { + IndexMetaData.Builder followerIndex = IndexMetaData.builder("follow_index") + .settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)) + .numberOfShards(1) + .numberOfReplicas(0) + .putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>()); + + ClusterState current = ClusterState.builder(new ClusterName("cluster_name")) + .metaData(MetaData.builder() + .put(followerIndex) + .build()) + .build(); + Exception e = expectThrows(IllegalArgumentException.class, () -> TransportUnfollowAction.unfollow("follow_index", current)); + assertThat(e.getMessage(), equalTo("cannot unfollow index [follow_index], because it has not been closed")); + } + + public void testUnfollowRunningShardFollowTasks() { + IndexMetaData.Builder followerIndex = IndexMetaData.builder("follow_index") + .settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)) + .numberOfShards(1) + .numberOfReplicas(0) + .state(IndexMetaData.State.CLOSE) + .putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>()); + + + ShardFollowTask params = new ShardFollowTask( + null, + new ShardId("follow_index", "", 0), + new ShardId("leader_index", "", 0), + 1024, + 1, + TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, + 1, + 10240, + TimeValue.timeValueMillis(10), + TimeValue.timeValueMillis(10), + "uuid", + Collections.emptyMap() + ); + PersistentTasksCustomMetaData.PersistentTask task = + new PersistentTasksCustomMetaData.PersistentTask<>("id", ShardFollowTask.NAME, params, 0, null); + + ClusterState current = ClusterState.builder(new ClusterName("cluster_name")) + .metaData(MetaData.builder() + .put(followerIndex) + .putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(0, Collections.singletonMap("id", task))) + .build()) + .build(); + Exception e = expectThrows(IllegalArgumentException.class, () -> TransportUnfollowAction.unfollow("follow_index", current)); + assertThat(e.getMessage(), equalTo("cannot unfollow index [follow_index], because it has not been paused")); + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/UnfollowAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/UnfollowAction.java new file mode 100644 index 0000000000000..cf8c9ec2e6101 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/UnfollowAction.java @@ -0,0 +1,67 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ccr.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class UnfollowAction extends Action { + + public static final UnfollowAction INSTANCE = new UnfollowAction(); + public static final String NAME = "cluster:admin/xpack/ccr/unfollow"; + + private UnfollowAction() { + super(NAME); + } + + @Override + public AcknowledgedResponse newResponse() { + return new AcknowledgedResponse(); + } + + public static class Request extends AcknowledgedRequest { + + private final String followerIndex; + + public Request(String followerIndex) { + this.followerIndex = followerIndex; + } + + public Request(StreamInput in) throws IOException { + super(in); + followerIndex = in.readString(); + } + + public String getFollowerIndex() { + return followerIndex; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException e = null; + if (followerIndex == null) { + e = addValidationError("follower index is missing", e); + } + return e; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(followerIndex); + } + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java index dd4af08978bfa..1dab97599dfc2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java @@ -19,6 +19,7 @@ import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; +import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; import java.util.Objects; @@ -85,6 +86,16 @@ public ActionFuture pauseFollow(final PauseFollowAction.Re return listener; } + public void unfollow(final UnfollowAction.Request request, final ActionListener listener) { + client.execute(UnfollowAction.INSTANCE, request, listener); + } + + public ActionFuture unfollow(final UnfollowAction.Request request) { + final PlainActionFuture listener = PlainActionFuture.newFuture(); + client.execute(UnfollowAction.INSTANCE, request, listener); + return listener; + } + public void putAutoFollowPattern( final PutAutoFollowPatternAction.Request request, final ActionListener listener) { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.unfollow.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.unfollow.json new file mode 100644 index 0000000000000..41be574421fc6 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.unfollow.json @@ -0,0 +1,17 @@ +{ + "ccr.unfollow": { + "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current", + "methods": [ "POST" ], + "url": { + "path": "/{index}/_ccr/unfollow", + "paths": [ "/{index}/_ccr/unfollow" ], + "parts": { + "index": { + "type": "string", + "required": true, + "description": "The name of the follower index that should be turned into a regular index." + } + } + } + } +} From 78466f18c9ed4c7fad92f8ec40a0ee121fc5baeb Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 28 Sep 2018 17:05:52 +0200 Subject: [PATCH 2/2] improved error message --- .../xpack/ccr/action/TransportUnfollowAction.java | 7 ++++--- .../xpack/ccr/action/TransportUnfollowActionTests.java | 6 ++++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java index 76d972b9ac9a9..1ce01f7ab0953 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowAction.java @@ -85,15 +85,16 @@ static ClusterState unfollow(String followerIndex, ClusterState current) { if (persistentTask.getTaskName().equals(ShardFollowTask.NAME)) { ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams(); if (shardFollowTask.getFollowShardId().getIndexName().equals(followerIndex)) { - throw new IllegalArgumentException("cannot unfollow index [" + followerIndex + - "], because it has not been paused"); + throw new IllegalArgumentException("cannot convert the follower index [" + followerIndex + + "] to a non-follower, because it has not been paused"); } } } } if (followerIMD.getState() != IndexMetaData.State.CLOSE) { - throw new IllegalArgumentException("cannot unfollow index [" + followerIndex + "], because it has not been closed"); + throw new IllegalArgumentException("cannot convert the follower index [" + followerIndex + + "] to a non-follower, because it has not been closed"); } IndexMetaData.Builder newIMD = IndexMetaData.builder(followerIMD); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java index b2624839d8728..1240b37e31287 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportUnfollowActionTests.java @@ -59,7 +59,8 @@ public void testUnfollowIndexOpen() { .build()) .build(); Exception e = expectThrows(IllegalArgumentException.class, () -> TransportUnfollowAction.unfollow("follow_index", current)); - assertThat(e.getMessage(), equalTo("cannot unfollow index [follow_index], because it has not been closed")); + assertThat(e.getMessage(), + equalTo("cannot convert the follower index [follow_index] to a non-follower, because it has not been closed")); } public void testUnfollowRunningShardFollowTasks() { @@ -95,7 +96,8 @@ public void testUnfollowRunningShardFollowTasks() { .build()) .build(); Exception e = expectThrows(IllegalArgumentException.class, () -> TransportUnfollowAction.unfollow("follow_index", current)); - assertThat(e.getMessage(), equalTo("cannot unfollow index [follow_index], because it has not been paused")); + assertThat(e.getMessage(), + equalTo("cannot convert the follower index [follow_index] to a non-follower, because it has not been paused")); } }