Skip to content

Commit 6f73abb

Browse files
committed
[CCR] Add unfollow API (#34132)
The unfollow API changes a follower index into a regular index, so that it will accept write requests from clients. For the unfollow api to work the index follow needs to be stopped and the index needs to be closed. Closes #33931
1 parent d225bf9 commit 6f73abb

File tree

10 files changed

+425
-0
lines changed

10 files changed

+425
-0
lines changed

server/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -964,6 +964,10 @@ public Builder putCustom(String type, Map<String, String> customIndexMetaData) {
964964
return this;
965965
}
966966

967+
public Map<String, String> removeCustom(String type) {
968+
return this.customMetaData.remove(type);
969+
}
970+
967971
public Set<String> getInSyncAllocationIds(int shardId) {
968972
return inSyncAllocationIds.get(shardId);
969973
}

x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,13 @@
4040
ccr.pause_follow:
4141
index: bar
4242
- is_true: acknowledged
43+
44+
- do:
45+
indices.close:
46+
index: bar
47+
- is_true: acknowledged
48+
49+
- do:
50+
ccr.unfollow:
51+
index: bar
52+
- is_true: acknowledged

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,11 @@
4141
import org.elasticsearch.watcher.ResourceWatcherService;
4242
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
4343
import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction;
44+
import org.elasticsearch.xpack.ccr.action.TransportUnfollowAction;
4445
import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction;
4546
import org.elasticsearch.xpack.ccr.action.TransportAutoFollowStatsAction;
4647
import org.elasticsearch.xpack.ccr.rest.RestAutoFollowStatsAction;
48+
import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction;
4749
import org.elasticsearch.xpack.core.ccr.action.AutoFollowStatsAction;
4850
import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction;
4951
import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction;
@@ -73,6 +75,7 @@
7375
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
7476
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
7577
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
78+
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
7679

7780
import java.util.Arrays;
7881
import java.util.Collection;
@@ -170,6 +173,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
170173
new ActionHandler<>(PutFollowAction.INSTANCE, TransportPutFollowAction.class),
171174
new ActionHandler<>(ResumeFollowAction.INSTANCE, TransportResumeFollowAction.class),
172175
new ActionHandler<>(PauseFollowAction.INSTANCE, TransportPauseFollowAction.class),
176+
new ActionHandler<>(UnfollowAction.INSTANCE, TransportUnfollowAction.class),
173177
// auto-follow actions
174178
new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class),
175179
new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class),
@@ -192,6 +196,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
192196
new RestPutFollowAction(settings, restController),
193197
new RestResumeFollowAction(settings, restController),
194198
new RestPauseFollowAction(settings, restController),
199+
new RestUnfollowAction(settings, restController),
195200
// auto-follow APIs
196201
new RestDeleteAutoFollowPatternAction(settings, restController),
197202
new RestPutAutoFollowPatternAction(settings, restController),
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ccr.action;
8+
9+
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.support.ActionFilters;
11+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
12+
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
13+
import org.elasticsearch.cluster.ClusterState;
14+
import org.elasticsearch.cluster.ClusterStateUpdateTask;
15+
import org.elasticsearch.cluster.block.ClusterBlockException;
16+
import org.elasticsearch.cluster.block.ClusterBlockLevel;
17+
import org.elasticsearch.cluster.metadata.IndexMetaData;
18+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
19+
import org.elasticsearch.cluster.metadata.MetaData;
20+
import org.elasticsearch.cluster.service.ClusterService;
21+
import org.elasticsearch.common.inject.Inject;
22+
import org.elasticsearch.common.settings.Settings;
23+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
24+
import org.elasticsearch.threadpool.ThreadPool;
25+
import org.elasticsearch.transport.TransportService;
26+
import org.elasticsearch.xpack.ccr.Ccr;
27+
import org.elasticsearch.xpack.ccr.CcrSettings;
28+
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
29+
30+
public class TransportUnfollowAction extends TransportMasterNodeAction<UnfollowAction.Request, AcknowledgedResponse> {
31+
32+
@Inject
33+
public TransportUnfollowAction(Settings settings, TransportService transportService, ClusterService clusterService,
34+
ThreadPool threadPool, ActionFilters actionFilters,
35+
IndexNameExpressionResolver indexNameExpressionResolver) {
36+
super(settings, UnfollowAction.NAME, transportService, clusterService, threadPool, actionFilters,
37+
indexNameExpressionResolver, UnfollowAction.Request::new);
38+
}
39+
40+
@Override
41+
protected String executor() {
42+
return ThreadPool.Names.SAME;
43+
}
44+
45+
@Override
46+
protected AcknowledgedResponse newResponse() {
47+
return new AcknowledgedResponse();
48+
}
49+
50+
@Override
51+
protected void masterOperation(UnfollowAction.Request request,
52+
ClusterState state,
53+
ActionListener<AcknowledgedResponse> listener) throws Exception {
54+
clusterService.submitStateUpdateTask("unfollow_action", new ClusterStateUpdateTask() {
55+
56+
@Override
57+
public ClusterState execute(ClusterState current) throws Exception {
58+
String followerIndex = request.getFollowerIndex();
59+
return unfollow(followerIndex, current);
60+
}
61+
62+
@Override
63+
public void onFailure(String source, Exception e) {
64+
listener.onFailure(e);
65+
}
66+
67+
@Override
68+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
69+
listener.onResponse(new AcknowledgedResponse(true));
70+
}
71+
});
72+
}
73+
74+
@Override
75+
protected ClusterBlockException checkBlock(UnfollowAction.Request request, ClusterState state) {
76+
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
77+
}
78+
79+
static ClusterState unfollow(String followerIndex, ClusterState current) {
80+
IndexMetaData followerIMD = current.metaData().index(followerIndex);
81+
82+
PersistentTasksCustomMetaData persistentTasks = current.metaData().custom(PersistentTasksCustomMetaData.TYPE);
83+
if (persistentTasks != null) {
84+
for (PersistentTasksCustomMetaData.PersistentTask<?> persistentTask : persistentTasks.tasks()) {
85+
if (persistentTask.getTaskName().equals(ShardFollowTask.NAME)) {
86+
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
87+
if (shardFollowTask.getFollowShardId().getIndexName().equals(followerIndex)) {
88+
throw new IllegalArgumentException("cannot convert the follower index [" + followerIndex +
89+
"] to a non-follower, because it has not been paused");
90+
}
91+
}
92+
}
93+
}
94+
95+
if (followerIMD.getState() != IndexMetaData.State.CLOSE) {
96+
throw new IllegalArgumentException("cannot convert the follower index [" + followerIndex +
97+
"] to a non-follower, because it has not been closed");
98+
}
99+
100+
IndexMetaData.Builder newIMD = IndexMetaData.builder(followerIMD);
101+
// Remove index.xpack.ccr.following_index setting
102+
Settings.Builder builder = Settings.builder();
103+
builder.put(followerIMD.getSettings());
104+
builder.remove(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey());
105+
106+
newIMD.settings(builder);
107+
// Remove ccr custom metadata
108+
newIMD.removeCustom(Ccr.CCR_CUSTOM_METADATA_KEY);
109+
110+
MetaData newMetaData = MetaData.builder(current.metaData())
111+
.put(newIMD)
112+
.build();
113+
return ClusterState.builder(current)
114+
.metaData(newMetaData)
115+
.build();
116+
}
117+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ccr.rest;
8+
9+
import org.elasticsearch.client.node.NodeClient;
10+
import org.elasticsearch.common.settings.Settings;
11+
import org.elasticsearch.rest.BaseRestHandler;
12+
import org.elasticsearch.rest.RestController;
13+
import org.elasticsearch.rest.RestRequest;
14+
import org.elasticsearch.rest.action.RestToXContentListener;
15+
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
16+
17+
import java.io.IOException;
18+
19+
import static org.elasticsearch.xpack.core.ccr.action.UnfollowAction.INSTANCE;
20+
21+
public class RestUnfollowAction extends BaseRestHandler {
22+
23+
public RestUnfollowAction(Settings settings, RestController controller) {
24+
super(settings);
25+
controller.registerHandler(RestRequest.Method.POST, "/{index}/_ccr/unfollow", this);
26+
}
27+
28+
@Override
29+
public String getName() {
30+
return "ccr_unfollow_action";
31+
}
32+
33+
@Override
34+
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
35+
UnfollowAction.Request request = new UnfollowAction.Request(restRequest.param("index"));
36+
return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel));
37+
}
38+
39+
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.index.IndexRequest;
2424
import org.elasticsearch.action.search.SearchRequest;
2525
import org.elasticsearch.action.search.SearchResponse;
26+
import org.elasticsearch.action.support.WriteRequest;
2627
import org.elasticsearch.action.support.ActiveShardCount;
2728
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
2829
import org.elasticsearch.cluster.ClusterState;
@@ -62,6 +63,7 @@
6263
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
6364
import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction;
6465
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
66+
import org.elasticsearch.xpack.core.ccr.action.UnfollowAction;
6567

6668
import java.io.IOException;
6769
import java.util.Arrays;
@@ -650,6 +652,33 @@ public void testDeleteFollowerIndex() throws Exception {
650652
ensureNoCcrTasks();
651653
}
652654

655+
public void testUnfollowIndex() throws Exception {
656+
String leaderIndexSettings = getIndexSettings(1, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
657+
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON).get());
658+
PutFollowAction.Request followRequest = follow("index1", "index2");
659+
client().execute(PutFollowAction.INSTANCE, followRequest).get();
660+
client().prepareIndex("index1", "doc").setSource("{}", XContentType.JSON).get();
661+
assertBusy(() -> {
662+
assertThat(client().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(1L));
663+
});
664+
665+
// Indexing directly into index2 would fail now, because index2 is a follow index.
666+
// We can't test this here because an assertion trips before an actual error is thrown and then index call hangs.
667+
668+
// Turn follow index into a regular index by: pausing shard follow, close index, unfollow index and then open index:
669+
unfollowIndex("index2");
670+
client().admin().indices().close(new CloseIndexRequest("index2")).actionGet();
671+
assertAcked(client().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request("index2")).actionGet());
672+
client().admin().indices().open(new OpenIndexRequest("index2")).actionGet();
673+
ensureGreen("index2");
674+
675+
// Indexing succeeds now, because index2 is no longer a follow index:
676+
client().prepareIndex("index2", "doc").setSource("{}", XContentType.JSON)
677+
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
678+
.get();
679+
assertThat(client().prepareSearch("index2").get().getHits().getTotalHits(), equalTo(2L));
680+
}
681+
653682
private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, final Map<ShardId, Long> numDocsPerShard) {
654683
return () -> {
655684
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ccr.action;
8+
9+
import org.elasticsearch.Version;
10+
import org.elasticsearch.cluster.ClusterName;
11+
import org.elasticsearch.cluster.ClusterState;
12+
import org.elasticsearch.cluster.metadata.IndexMetaData;
13+
import org.elasticsearch.cluster.metadata.MetaData;
14+
import org.elasticsearch.common.unit.TimeValue;
15+
import org.elasticsearch.index.shard.ShardId;
16+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
17+
import org.elasticsearch.test.ESTestCase;
18+
import org.elasticsearch.xpack.ccr.Ccr;
19+
import org.elasticsearch.xpack.ccr.CcrSettings;
20+
21+
import java.util.Collections;
22+
import java.util.HashMap;
23+
24+
import static org.hamcrest.Matchers.equalTo;
25+
import static org.hamcrest.Matchers.nullValue;
26+
27+
public class TransportUnfollowActionTests extends ESTestCase {
28+
29+
public void testUnfollow() {
30+
IndexMetaData.Builder followerIndex = IndexMetaData.builder("follow_index")
31+
.settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true))
32+
.numberOfShards(1)
33+
.numberOfReplicas(0)
34+
.state(IndexMetaData.State.CLOSE)
35+
.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>());
36+
37+
ClusterState current = ClusterState.builder(new ClusterName("cluster_name"))
38+
.metaData(MetaData.builder()
39+
.put(followerIndex)
40+
.build())
41+
.build();
42+
ClusterState result = TransportUnfollowAction.unfollow("follow_index", current);
43+
44+
IndexMetaData resultIMD = result.metaData().index("follow_index");
45+
assertThat(resultIMD.getSettings().get(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey()), nullValue());
46+
assertThat(resultIMD.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY), nullValue());
47+
}
48+
49+
public void testUnfollowIndexOpen() {
50+
IndexMetaData.Builder followerIndex = IndexMetaData.builder("follow_index")
51+
.settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true))
52+
.numberOfShards(1)
53+
.numberOfReplicas(0)
54+
.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>());
55+
56+
ClusterState current = ClusterState.builder(new ClusterName("cluster_name"))
57+
.metaData(MetaData.builder()
58+
.put(followerIndex)
59+
.build())
60+
.build();
61+
Exception e = expectThrows(IllegalArgumentException.class, () -> TransportUnfollowAction.unfollow("follow_index", current));
62+
assertThat(e.getMessage(),
63+
equalTo("cannot convert the follower index [follow_index] to a non-follower, because it has not been closed"));
64+
}
65+
66+
public void testUnfollowRunningShardFollowTasks() {
67+
IndexMetaData.Builder followerIndex = IndexMetaData.builder("follow_index")
68+
.settings(settings(Version.CURRENT).put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true))
69+
.numberOfShards(1)
70+
.numberOfReplicas(0)
71+
.state(IndexMetaData.State.CLOSE)
72+
.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>());
73+
74+
75+
ShardFollowTask params = new ShardFollowTask(
76+
null,
77+
new ShardId("follow_index", "", 0),
78+
new ShardId("leader_index", "", 0),
79+
1024,
80+
1,
81+
TransportResumeFollowAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES,
82+
1,
83+
10240,
84+
TimeValue.timeValueMillis(10),
85+
TimeValue.timeValueMillis(10),
86+
"uuid",
87+
Collections.emptyMap()
88+
);
89+
PersistentTasksCustomMetaData.PersistentTask<?> task =
90+
new PersistentTasksCustomMetaData.PersistentTask<>("id", ShardFollowTask.NAME, params, 0, null);
91+
92+
ClusterState current = ClusterState.builder(new ClusterName("cluster_name"))
93+
.metaData(MetaData.builder()
94+
.put(followerIndex)
95+
.putCustom(PersistentTasksCustomMetaData.TYPE, new PersistentTasksCustomMetaData(0, Collections.singletonMap("id", task)))
96+
.build())
97+
.build();
98+
Exception e = expectThrows(IllegalArgumentException.class, () -> TransportUnfollowAction.unfollow("follow_index", current));
99+
assertThat(e.getMessage(),
100+
equalTo("cannot convert the follower index [follow_index] to a non-follower, because it has not been paused"));
101+
}
102+
103+
}

0 commit comments

Comments
 (0)