Skip to content

Commit a721d09

Browse files
authored
[CCR] Added auto follow patterns feature (#33118)
Auto Following Patterns is a cross cluster replication feature that keeps track whether in the leader cluster indices are being created with names that match with a specific pattern and if so automatically let the follower cluster follow these newly created indices. This change adds an `AutoFollowCoordinator` component that is only active on the elected master node. Periodically this component checks the the cluster state of remote clusters if there new leader indices that match with configured auto follow patterns that have been defined in `AutoFollowMetadata` custom metadata. This change also adds two new APIs to manage auto follow patterns. A put auto follow pattern api: ``` PUT /_ccr/_autofollow/{{remote_cluster}} { "leader_index_pattern": ["logs-*", ...], "follow_index_pattern": "{{leader_index}}-copy", "max_concurrent_read_batches": 2 ... // other optional parameters } ``` and delete auto follow pattern api: ``` DELETE /_ccr/_autofollow/{{remote_cluster_alias}} ``` The auto follow patterns are directly tied to the remote cluster aliases configured in the follow cluster. Relates to #33007 Co-authored-by: Jason Tedor [email protected]
1 parent d71ced1 commit a721d09

File tree

23 files changed

+2380
-6
lines changed

23 files changed

+2380
-6
lines changed

server/src/main/java/org/elasticsearch/common/regex/Regex.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,15 @@ public static boolean simpleMatch(String[] patterns, String str) {
138138
return false;
139139
}
140140

141+
/**
142+
* Similar to {@link #simpleMatch(String[], String)}, but accepts a list of strings instead of an array of strings for the patterns to
143+
* match.
144+
*/
145+
public static boolean simpleMatch(final List<String> patterns, final String str) {
146+
// #simpleMatch(String[], String) is likely to be inlined into this method
147+
return patterns != null && simpleMatch(patterns.toArray(Strings.EMPTY_ARRAY), str);
148+
}
149+
141150
public static boolean simpleMatch(String[] patterns, String[] types) {
142151
if (patterns != null && types != null) {
143152
for (String type : types) {

x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,34 @@ public void testFollowIndex() throws Exception {
7878
}
7979
}
8080

81+
public void testAutoFollowPatterns() throws Exception {
82+
assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster);
83+
84+
Request request = new Request("PUT", "/_ccr/_auto_follow/leader_cluster");
85+
request.setJsonEntity("{\"leader_index_patterns\": [\"logs-*\"]}");
86+
assertOK(client().performRequest(request));
87+
88+
try (RestClient leaderClient = buildLeaderClient()) {
89+
Settings settings = Settings.builder()
90+
.put("index.soft_deletes.enabled", true)
91+
.build();
92+
request = new Request("PUT", "/logs-20190101");
93+
request.setJsonEntity("{\"settings\": " + Strings.toString(settings) +
94+
", \"mappings\": {\"_doc\": {\"properties\": {\"field\": {\"type\": \"keyword\"}}}} }");
95+
assertOK(leaderClient.performRequest(request));
96+
97+
for (int i = 0; i < 5; i++) {
98+
String id = Integer.toString(i);
99+
index(leaderClient, "logs-20190101", id, "field", i, "filtered_field", "true");
100+
}
101+
}
102+
103+
assertBusy(() -> {
104+
ensureYellow("logs-20190101");
105+
verifyDocuments("logs-20190101", 5);
106+
});
107+
}
108+
81109
private static void index(RestClient client, String index, String id, Object... fields) throws IOException {
82110
XContentBuilder document = jsonBuilder().startObject();
83111
for (int i = 0; i < fields.length; i += 2) {
@@ -135,6 +163,15 @@ private static Map<String, Object> toMap(String response) {
135163
return XContentHelper.convertToMap(JsonXContent.jsonXContent, response, false);
136164
}
137165

166+
private static void ensureYellow(String index) throws IOException {
167+
Request request = new Request("GET", "/_cluster/health/" + index);
168+
request.addParameter("wait_for_status", "yellow");
169+
request.addParameter("wait_for_no_relocating_shards", "true");
170+
request.addParameter("timeout", "70s");
171+
request.addParameter("level", "shards");
172+
client().performRequest(request);
173+
}
174+
138175
private RestClient buildLeaderClient() throws IOException {
139176
assert runningAgainstLeaderCluster == false;
140177
String leaderUrl = System.getProperty("tests.leader_host");

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,21 +39,28 @@
3939
import org.elasticsearch.threadpool.FixedExecutorBuilder;
4040
import org.elasticsearch.threadpool.ThreadPool;
4141
import org.elasticsearch.watcher.ResourceWatcherService;
42+
import org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator;
4243
import org.elasticsearch.xpack.ccr.action.CcrStatsAction;
4344
import org.elasticsearch.xpack.ccr.action.CreateAndFollowIndexAction;
45+
import org.elasticsearch.xpack.ccr.action.DeleteAutoFollowPatternAction;
4446
import org.elasticsearch.xpack.ccr.action.FollowIndexAction;
47+
import org.elasticsearch.xpack.ccr.action.PutAutoFollowPatternAction;
4548
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
4649
import org.elasticsearch.xpack.ccr.action.ShardFollowNodeTask;
4750
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
4851
import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor;
4952
import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction;
53+
import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction;
54+
import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction;
5055
import org.elasticsearch.xpack.ccr.action.UnfollowIndexAction;
5156
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsAction;
5257
import org.elasticsearch.xpack.ccr.action.bulk.TransportBulkShardOperationsAction;
5358
import org.elasticsearch.xpack.ccr.index.engine.FollowingEngineFactory;
5459
import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction;
5560
import org.elasticsearch.xpack.ccr.rest.RestCreateAndFollowIndexAction;
61+
import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction;
5662
import org.elasticsearch.xpack.ccr.rest.RestFollowIndexAction;
63+
import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction;
5764
import org.elasticsearch.xpack.ccr.rest.RestUnfollowIndexAction;
5865
import org.elasticsearch.xpack.core.XPackPlugin;
5966

@@ -113,7 +120,14 @@ public Collection<Object> createComponents(
113120
final Environment environment,
114121
final NodeEnvironment nodeEnvironment,
115122
final NamedWriteableRegistry namedWriteableRegistry) {
116-
return Collections.singleton(ccrLicenseChecker);
123+
if (enabled == false) {
124+
return emptyList();
125+
}
126+
127+
return Arrays.asList(
128+
ccrLicenseChecker,
129+
new AutoFollowCoordinator(settings, client, threadPool, clusterService)
130+
);
117131
}
118132

119133
@Override
@@ -128,23 +142,34 @@ public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterServic
128142
}
129143

130144
return Arrays.asList(
145+
// internal actions
131146
new ActionHandler<>(BulkShardOperationsAction.INSTANCE, TransportBulkShardOperationsAction.class),
147+
new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class),
148+
// stats action
132149
new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class),
150+
// follow actions
133151
new ActionHandler<>(CreateAndFollowIndexAction.INSTANCE, CreateAndFollowIndexAction.TransportAction.class),
134152
new ActionHandler<>(FollowIndexAction.INSTANCE, FollowIndexAction.TransportAction.class),
135-
new ActionHandler<>(ShardChangesAction.INSTANCE, ShardChangesAction.TransportAction.class),
136-
new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class));
153+
new ActionHandler<>(UnfollowIndexAction.INSTANCE, UnfollowIndexAction.TransportAction.class),
154+
// auto-follow actions
155+
new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class),
156+
new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class));
137157
}
138158

139159
public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings,
140160
IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter,
141161
IndexNameExpressionResolver indexNameExpressionResolver,
142162
Supplier<DiscoveryNodes> nodesInCluster) {
143163
return Arrays.asList(
164+
// stats API
144165
new RestCcrStatsAction(settings, restController),
166+
// follow APIs
145167
new RestCreateAndFollowIndexAction(settings, restController),
146168
new RestFollowIndexAction(settings, restController),
147-
new RestUnfollowIndexAction(settings, restController));
169+
new RestUnfollowIndexAction(settings, restController),
170+
// auto-follow APIs
171+
new RestDeleteAutoFollowPatternAction(settings, restController),
172+
new RestPutAutoFollowPatternAction(settings, restController));
148173
}
149174

150175
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrSettings.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.elasticsearch.common.settings.Setting;
99
import org.elasticsearch.common.settings.Setting.Property;
10+
import org.elasticsearch.common.unit.TimeValue;
1011

1112
import java.util.Arrays;
1213
import java.util.List;
@@ -32,6 +33,12 @@ private CcrSettings() {
3233
public static final Setting<Boolean> CCR_FOLLOWING_INDEX_SETTING =
3334
Setting.boolSetting("index.xpack.ccr.following_index", false, Setting.Property.IndexScope);
3435

36+
/**
37+
* Setting for controlling the interval in between polling leader clusters to check whether there are indices to follow
38+
*/
39+
public static final Setting<TimeValue> CCR_AUTO_FOLLOW_POLL_INTERVAL =
40+
Setting.timeSetting("xpack.ccr.auto_follow.poll_interval", TimeValue.timeValueMillis(2500), Property.NodeScope);
41+
3542
/**
3643
* The settings defined by CCR.
3744
*
@@ -40,7 +47,8 @@ private CcrSettings() {
4047
static List<Setting<?>> getSettings() {
4148
return Arrays.asList(
4249
CCR_ENABLED_SETTING,
43-
CCR_FOLLOWING_INDEX_SETTING);
50+
CCR_FOLLOWING_INDEX_SETTING,
51+
CCR_AUTO_FOLLOW_POLL_INTERVAL);
4452
}
4553

4654
}

0 commit comments

Comments
 (0)