Skip to content

Commit a79b07a

Browse files
committed
Add Pause/Resume Auto Follower APIs (#47510)
This commit adds two APIs that allow to pause and resume CCR auto-follower patterns: // pause auto-follower POST /_ccr/auto_follow/my_pattern/pause // resume auto-follower POST /_ccr/auto_follow/my_pattern/resume The ability to pause and resume auto-follow patterns can be useful in some situations, including the rolling upgrades of cluster using a bi-directional cross-cluster replication scheme (see #46665). This committ adds a new active flag to the AutoFollowPattern and adapts the AutoCoordinator and AutoFollower classes so that it stops to fetch remote's cluster state when all auto-follow patterns associate to the remote cluster are paused. When an auto-follower is paused, remote indices that match the pattern are just ignored: they are not added to the pattern's followed indices uids list that is maintained in the local cluster state. This way, when the auto-follow pattern is resumed the indices created in the remote cluster in the meantime will be picked up again and added as new following indices. Indices created and then deleted in the remote cluster will be ignored as they won't be seen at all by the auto-follower pattern at resume time. Backport of #47510 for 7.x
1 parent 65717f6 commit a79b07a

24 files changed

+1166
-86
lines changed

client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/GetAutoFollowPatternResponseTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ protected GetAutoFollowPatternAction.Response createServerTestInstance(XContentT
4949
String remoteCluster = randomAlphaOfLength(4);
5050
List<String> leaderIndexPatters = Collections.singletonList(randomAlphaOfLength(4));
5151
String followIndexNamePattern = randomAlphaOfLength(4);
52+
boolean active = randomBoolean();
5253

5354
Integer maxOutstandingReadRequests = null;
5455
if (randomBoolean()) {
@@ -91,7 +92,7 @@ protected GetAutoFollowPatternAction.Response createServerTestInstance(XContentT
9192
readPollTimeout = new TimeValue(randomNonNegativeLong());
9293
}
9394
patterns.put(randomAlphaOfLength(4), new AutoFollowMetadata.AutoFollowPattern(remoteCluster, leaderIndexPatters,
94-
followIndexNamePattern, maxReadRequestOperationCount, maxWriteRequestOperationCount, maxOutstandingReadRequests,
95+
followIndexNamePattern, active, maxReadRequestOperationCount, maxWriteRequestOperationCount, maxOutstandingReadRequests,
9596
maxOutstandingWriteRequests, maxReadRequestSize, maxWriteRequestSize, maxWriteBufferCount, maxWriteBufferSize,
9697
maxRetryDelay, readPollTimeout));
9798
}

docs/reference/ccr/apis/auto-follow/get-auto-follow-pattern.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ The API returns the following result:
9090
{
9191
"name": "my_auto_follow_pattern",
9292
"pattern": {
93+
"active": true,
9394
"remote_cluster" : "remote_cluster",
9495
"leader_index_patterns" :
9596
[

x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,89 @@
5252
catch: missing
5353
ccr.get_auto_follow_pattern:
5454
name: my_pattern
55+
56+
---
57+
"Test pause and resume auto follow pattern":
58+
- skip:
59+
version: " - 7.4.99"
60+
reason: "pause/resume auto-follow patterns is supported since 7.5.0"
61+
62+
- do:
63+
cluster.state: {}
64+
65+
- set: {master_node: master}
66+
67+
- do:
68+
nodes.info: {}
69+
70+
- set: {nodes.$master.transport_address: local_ip}
71+
72+
- do:
73+
cluster.put_settings:
74+
body:
75+
transient:
76+
cluster.remote.local.seeds: $local_ip
77+
flat_settings: true
78+
79+
- match: {transient: {cluster.remote.local.seeds: $local_ip}}
80+
81+
- do:
82+
ccr.put_auto_follow_pattern:
83+
name: pattern_test
84+
body:
85+
remote_cluster: local
86+
leader_index_patterns: ['logs-*']
87+
max_outstanding_read_requests: 2
88+
- is_true: acknowledged
89+
90+
- do:
91+
ccr.get_auto_follow_pattern:
92+
name: pattern_test
93+
- match: { patterns.0.name: 'pattern_test' }
94+
- match: { patterns.0.pattern.remote_cluster: 'local' }
95+
- match: { patterns.0.pattern.leader_index_patterns: ['logs-*'] }
96+
- match: { patterns.0.pattern.max_outstanding_read_requests: 2 }
97+
- match: { patterns.0.pattern.active: true }
98+
99+
- do:
100+
catch: missing
101+
ccr.pause_auto_follow_pattern:
102+
name: unknown_pattern
103+
104+
- do:
105+
ccr.pause_auto_follow_pattern:
106+
name: pattern_test
107+
- is_true: acknowledged
108+
109+
- do:
110+
ccr.get_auto_follow_pattern:
111+
name: pattern_test
112+
- match: { patterns.0.name: 'pattern_test' }
113+
- match: { patterns.0.pattern.remote_cluster: 'local' }
114+
- match: { patterns.0.pattern.leader_index_patterns: ['logs-*'] }
115+
- match: { patterns.0.pattern.max_outstanding_read_requests: 2 }
116+
- match: { patterns.0.pattern.active: false }
117+
118+
- do:
119+
catch: missing
120+
ccr.resume_auto_follow_pattern:
121+
name: unknown_pattern
122+
123+
- do:
124+
ccr.resume_auto_follow_pattern:
125+
name: pattern_test
126+
- is_true: acknowledged
127+
128+
- do:
129+
ccr.get_auto_follow_pattern:
130+
name: pattern_test
131+
- match: { patterns.0.name: 'pattern_test' }
132+
- match: { patterns.0.pattern.remote_cluster: 'local' }
133+
- match: { patterns.0.pattern.leader_index_patterns: ['logs-*'] }
134+
- match: { patterns.0.pattern.max_outstanding_read_requests: 2 }
135+
- match: { patterns.0.pattern.active: true }
136+
137+
- do:
138+
ccr.delete_auto_follow_pattern:
139+
name: pattern_test
140+
- is_true: acknowledged

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.elasticsearch.xpack.ccr.action.TransportFollowStatsAction;
6262
import org.elasticsearch.xpack.ccr.action.TransportForgetFollowerAction;
6363
import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction;
64+
import org.elasticsearch.xpack.ccr.action.TransportActivateAutoFollowPatternAction;
6465
import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction;
6566
import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction;
6667
import org.elasticsearch.xpack.ccr.action.TransportPutFollowAction;
@@ -82,9 +83,11 @@
8283
import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction;
8384
import org.elasticsearch.xpack.ccr.rest.RestForgetFollowerAction;
8485
import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction;
86+
import org.elasticsearch.xpack.ccr.rest.RestPauseAutoFollowPatternAction;
8587
import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction;
8688
import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction;
8789
import org.elasticsearch.xpack.ccr.rest.RestPutFollowAction;
90+
import org.elasticsearch.xpack.ccr.rest.RestResumeAutoFollowPatternAction;
8891
import org.elasticsearch.xpack.ccr.rest.RestResumeFollowAction;
8992
import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction;
9093
import org.elasticsearch.xpack.core.XPackPlugin;
@@ -97,6 +100,7 @@
97100
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;
98101
import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction;
99102
import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction;
103+
import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction;
100104
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
101105
import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction;
102106
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
@@ -236,6 +240,7 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
236240
new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class),
237241
new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class),
238242
new ActionHandler<>(GetAutoFollowPatternAction.INSTANCE, TransportGetAutoFollowPatternAction.class),
243+
new ActionHandler<>(ActivateAutoFollowPatternAction.INSTANCE, TransportActivateAutoFollowPatternAction.class),
239244
// forget follower action
240245
new ActionHandler<>(ForgetFollowerAction.INSTANCE, TransportForgetFollowerAction.class));
241246
}
@@ -262,6 +267,8 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
262267
new RestDeleteAutoFollowPatternAction(restController),
263268
new RestPutAutoFollowPatternAction(restController),
264269
new RestGetAutoFollowPatternAction(restController),
270+
new RestPauseAutoFollowPatternAction(restController),
271+
new RestResumeAutoFollowPatternAction(restController),
265272
// forget follower API
266273
new RestForgetFollowerAction(restController));
267274
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ synchronized void updateStats(List<AutoFollowResult> results) {
194194
}
195195

196196
void updateAutoFollowers(ClusterState followerClusterState) {
197-
AutoFollowMetadata autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE);
197+
final AutoFollowMetadata autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE);
198198
if (autoFollowMetadata == null) {
199199
return;
200200
}
@@ -206,8 +206,9 @@ void updateAutoFollowers(ClusterState followerClusterState) {
206206
}
207207

208208
final CopyOnWriteHashMap<String, AutoFollower> autoFollowers = CopyOnWriteHashMap.copyOf(this.autoFollowers);
209-
Set<String> newRemoteClusters = autoFollowMetadata.getPatterns().entrySet().stream()
210-
.map(entry -> entry.getValue().getRemoteCluster())
209+
Set<String> newRemoteClusters = autoFollowMetadata.getPatterns().values().stream()
210+
.filter(AutoFollowPattern::isActive)
211+
.map(AutoFollowPattern::getRemoteCluster)
211212
.filter(remoteCluster -> autoFollowers.containsKey(remoteCluster) == false)
212213
.collect(Collectors.toSet());
213214

@@ -283,6 +284,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
283284
String remoteCluster = entry.getKey();
284285
AutoFollower autoFollower = entry.getValue();
285286
boolean exist = autoFollowMetadata.getPatterns().values().stream()
287+
.filter(AutoFollowPattern::isActive)
286288
.anyMatch(pattern -> pattern.getRemoteCluster().equals(remoteCluster));
287289
if (exist == false) {
288290
LOGGER.info("removing auto-follower for remote cluster [{}]", remoteCluster);
@@ -345,6 +347,7 @@ abstract static class AutoFollower {
345347
private volatile CountDown autoFollowPatternsCountDown;
346348
private volatile AtomicArray<AutoFollowResult> autoFollowResults;
347349
private volatile boolean stop;
350+
private volatile List<String> lastActivePatterns = Collections.emptyList();
348351

349352
AutoFollower(final String remoteCluster,
350353
final Consumer<List<AutoFollowResult>> statsUpdater,
@@ -384,7 +387,9 @@ void start() {
384387

385388
final List<String> patterns = autoFollowMetadata.getPatterns().entrySet().stream()
386389
.filter(entry -> entry.getValue().getRemoteCluster().equals(remoteCluster))
390+
.filter(entry -> entry.getValue().isActive())
387391
.map(Map.Entry::getKey)
392+
.sorted()
388393
.collect(Collectors.toList());
389394
if (patterns.isEmpty()) {
390395
LOGGER.info("AutoFollower for cluster [{}] has stopped, because there are no more patterns", remoteCluster);
@@ -394,8 +399,15 @@ void start() {
394399
this.autoFollowPatternsCountDown = new CountDown(patterns.size());
395400
this.autoFollowResults = new AtomicArray<>(patterns.size());
396401

402+
// keep the list of the last known active patterns for this auto-follower
403+
// if the list changed, we explicitly retrieve the last cluster state in
404+
// order to avoid timeouts when waiting for the next remote cluster state
405+
// version that might never arrive
406+
final long nextMetadataVersion = Objects.equals(patterns, lastActivePatterns) ? metadataVersion + 1 : metadataVersion;
407+
this.lastActivePatterns = Collections.unmodifiableList(patterns);
408+
397409
final Thread thread = Thread.currentThread();
398-
getRemoteClusterState(remoteCluster, metadataVersion + 1, (remoteClusterStateResponse, remoteError) -> {
410+
getRemoteClusterState(remoteCluster, Math.max(1L, nextMetadataVersion), (remoteClusterStateResponse, remoteError) -> {
399411
// Also check removed flag here, as it may take a while for this remote cluster state api call to return:
400412
if (removed) {
401413
LOGGER.info("AutoFollower instance for cluster [{}] has been removed", remoteCluster);
@@ -445,8 +457,7 @@ private void autoFollowIndices(final AutoFollowMetadata autoFollowMetadata,
445457
Map<String, String> headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName);
446458
List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName);
447459

448-
final List<Index> leaderIndicesToFollow =
449-
getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState, followedIndices);
460+
final List<Index> leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState, followedIndices);
450461
if (leaderIndicesToFollow.isEmpty()) {
451462
finalise(slot, new AutoFollowResult(autoFollowPatternName), thread);
452463
} else {
@@ -599,7 +610,7 @@ static List<Index> getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern,
599610
if (leaderIndexMetaData.getState() != IndexMetaData.State.OPEN) {
600611
continue;
601612
}
602-
if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) {
613+
if (autoFollowPattern.isActive() && autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) {
603614
IndexRoutingTable indexRoutingTable = remoteClusterState.routingTable().index(leaderIndexMetaData.getIndex());
604615
if (indexRoutingTable != null &&
605616
// Leader indices can be in the cluster state, but not all primary shards may be ready yet.
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.ccr.action;
7+
8+
import org.elasticsearch.ResourceNotFoundException;
9+
import org.elasticsearch.action.ActionListener;
10+
import org.elasticsearch.action.support.ActionFilters;
11+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
12+
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
13+
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
14+
import org.elasticsearch.cluster.ClusterState;
15+
import org.elasticsearch.cluster.block.ClusterBlockException;
16+
import org.elasticsearch.cluster.block.ClusterBlockLevel;
17+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
18+
import org.elasticsearch.cluster.metadata.MetaData;
19+
import org.elasticsearch.cluster.service.ClusterService;
20+
import org.elasticsearch.common.inject.Inject;
21+
import org.elasticsearch.common.io.stream.StreamInput;
22+
import org.elasticsearch.threadpool.ThreadPool;
23+
import org.elasticsearch.transport.TransportService;
24+
import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata;
25+
import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction;
26+
import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.Request;
27+
28+
import java.io.IOException;
29+
import java.util.HashMap;
30+
import java.util.Map;
31+
32+
public class TransportActivateAutoFollowPatternAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
33+
34+
@Inject
35+
public TransportActivateAutoFollowPatternAction(TransportService transportService, ClusterService clusterService,
36+
ThreadPool threadPool, ActionFilters actionFilters,
37+
IndexNameExpressionResolver resolver) {
38+
super(ActivateAutoFollowPatternAction.NAME, transportService, clusterService, threadPool, actionFilters, Request::new, resolver);
39+
}
40+
41+
@Override
42+
protected String executor() {
43+
return ThreadPool.Names.SAME;
44+
}
45+
46+
@Override
47+
protected AcknowledgedResponse read(final StreamInput in) throws IOException {
48+
return new AcknowledgedResponse(in);
49+
}
50+
51+
@Override
52+
protected ClusterBlockException checkBlock(final Request request, final ClusterState state) {
53+
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
54+
}
55+
56+
@Override
57+
protected void masterOperation(ActivateAutoFollowPatternAction.Request request,
58+
ClusterState state,
59+
ActionListener<AcknowledgedResponse> listener) throws Exception {
60+
clusterService.submitStateUpdateTask("activate-auto-follow-pattern-" + request.getName(),
61+
new AckedClusterStateUpdateTask<AcknowledgedResponse>(request, listener) {
62+
63+
@Override
64+
protected AcknowledgedResponse newResponse(final boolean acknowledged) {
65+
return new AcknowledgedResponse(acknowledged);
66+
}
67+
68+
@Override
69+
public ClusterState execute(final ClusterState currentState) throws Exception {
70+
return innerActivate(request, currentState);
71+
}
72+
});
73+
}
74+
75+
static ClusterState innerActivate(final Request request, ClusterState currentState) {
76+
final AutoFollowMetadata autoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE);
77+
if (autoFollowMetadata == null) {
78+
throw new ResourceNotFoundException("auto-follow pattern [{}] is missing", request.getName());
79+
}
80+
81+
final Map<String, AutoFollowMetadata.AutoFollowPattern> patterns = autoFollowMetadata.getPatterns();
82+
final AutoFollowMetadata.AutoFollowPattern previousAutoFollowPattern = patterns.get(request.getName());
83+
if (previousAutoFollowPattern == null) {
84+
throw new ResourceNotFoundException("auto-follow pattern [{}] is missing", request.getName());
85+
}
86+
87+
if (previousAutoFollowPattern.isActive() == request.isActive()) {
88+
return currentState;
89+
}
90+
91+
final Map<String, AutoFollowMetadata.AutoFollowPattern> newPatterns = new HashMap<>(patterns);
92+
newPatterns.put(request.getName(),
93+
new AutoFollowMetadata.AutoFollowPattern(
94+
previousAutoFollowPattern.getRemoteCluster(),
95+
previousAutoFollowPattern.getLeaderIndexPatterns(),
96+
previousAutoFollowPattern.getFollowIndexPattern(),
97+
request.isActive(),
98+
previousAutoFollowPattern.getMaxReadRequestOperationCount(),
99+
previousAutoFollowPattern.getMaxWriteRequestOperationCount(),
100+
previousAutoFollowPattern.getMaxOutstandingReadRequests(),
101+
previousAutoFollowPattern.getMaxOutstandingWriteRequests(),
102+
previousAutoFollowPattern.getMaxReadRequestSize(),
103+
previousAutoFollowPattern.getMaxWriteRequestSize(),
104+
previousAutoFollowPattern.getMaxWriteBufferCount(),
105+
previousAutoFollowPattern.getMaxWriteBufferSize(),
106+
previousAutoFollowPattern.getMaxRetryDelay(),
107+
previousAutoFollowPattern.getReadPollTimeout()));
108+
109+
return ClusterState.builder(currentState)
110+
.metaData(MetaData.builder(currentState.getMetaData())
111+
.putCustom(AutoFollowMetadata.TYPE,
112+
new AutoFollowMetadata(newPatterns, autoFollowMetadata.getFollowedLeaderIndexUUIDs(), autoFollowMetadata.getHeaders()))
113+
.build())
114+
.build();
115+
}
116+
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request,
160160
request.getRemoteCluster(),
161161
request.getLeaderIndexPatterns(),
162162
request.getFollowIndexNamePattern(),
163+
true,
163164
request.getParameters().getMaxReadRequestOperationCount(),
164165
request.getParameters().getMaxWriteRequestOperationCount(),
165166
request.getParameters().getMaxOutstandingReadRequests(),

0 commit comments

Comments
 (0)