From a79b07ac6a4196bcdb7865f8483821a89678b350 Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Fri, 11 Oct 2019 12:33:47 +0200 Subject: [PATCH] Add Pause/Resume Auto Follower APIs (#47510) This commit adds two APIs that allow to pause and resume CCR auto-follower patterns: // pause auto-follower POST /_ccr/auto_follow/my_pattern/pause // resume auto-follower POST /_ccr/auto_follow/my_pattern/resume The ability to pause and resume auto-follow patterns can be useful in some situations, including the rolling upgrades of cluster using a bi-directional cross-cluster replication scheme (see #46665). This committ adds a new active flag to the AutoFollowPattern and adapts the AutoCoordinator and AutoFollower classes so that it stops to fetch remote's cluster state when all auto-follow patterns associate to the remote cluster are paused. When an auto-follower is paused, remote indices that match the pattern are just ignored: they are not added to the pattern's followed indices uids list that is maintained in the local cluster state. This way, when the auto-follow pattern is resumed the indices created in the remote cluster in the meantime will be picked up again and added as new following indices. Indices created and then deleted in the remote cluster will be ignored as they won't be seen at all by the auto-follower pattern at resume time. Backport of #47510 for 7.x --- .../GetAutoFollowPatternResponseTests.java | 3 +- .../get-auto-follow-pattern.asciidoc | 1 + .../rest-api-spec/test/ccr/auto_follow.yml | 86 ++++ .../java/org/elasticsearch/xpack/ccr/Ccr.java | 7 + .../ccr/action/AutoFollowCoordinator.java | 25 +- ...nsportActivateAutoFollowPatternAction.java | 116 +++++ .../TransportPutAutoFollowPatternAction.java | 1 + .../RestPauseAutoFollowPatternAction.java | 33 ++ .../RestResumeAutoFollowPatternAction.java | 33 ++ .../elasticsearch/xpack/ccr/AutoFollowIT.java | 186 +++++++- .../xpack/ccr/AutoFollowMetadataTests.java | 2 +- .../xpack/ccr/CCRFeatureSetTests.java | 2 +- .../elasticsearch/xpack/ccr/CcrLicenseIT.java | 2 +- ...teAutoFollowPatternActionRequestTests.java | 39 ++ .../action/AutoFollowCoordinatorTests.java | 429 +++++++++++++++--- .../GetAutoFollowPatternResponseTests.java | 2 +- ...tActivateAutoFollowPatternActionTests.java | 96 ++++ ...ortDeleteAutoFollowPatternActionTests.java | 12 +- ...nsportGetAutoFollowPatternActionTests.java | 4 +- ...nsportPutAutoFollowPatternActionTests.java | 2 +- .../xpack/core/ccr/AutoFollowMetadata.java | 39 +- .../ActivateAutoFollowPatternAction.java | 84 ++++ .../api/ccr.pause_auto_follow_pattern.json | 24 + .../api/ccr.resume_auto_follow_pattern.json | 24 + 24 files changed, 1166 insertions(+), 86 deletions(-) create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternAction.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPauseAutoFollowPatternAction.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestResumeAutoFollowPatternAction.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ActivateAutoFollowPatternActionRequestTests.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternActionTests.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ActivateAutoFollowPatternAction.java create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.pause_auto_follow_pattern.json create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.resume_auto_follow_pattern.json diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/GetAutoFollowPatternResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/GetAutoFollowPatternResponseTests.java index 820640635786d..c469647f7eb74 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/GetAutoFollowPatternResponseTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/GetAutoFollowPatternResponseTests.java @@ -49,6 +49,7 @@ protected GetAutoFollowPatternAction.Response createServerTestInstance(XContentT String remoteCluster = randomAlphaOfLength(4); List leaderIndexPatters = Collections.singletonList(randomAlphaOfLength(4)); String followIndexNamePattern = randomAlphaOfLength(4); + boolean active = randomBoolean(); Integer maxOutstandingReadRequests = null; if (randomBoolean()) { @@ -91,7 +92,7 @@ protected GetAutoFollowPatternAction.Response createServerTestInstance(XContentT readPollTimeout = new TimeValue(randomNonNegativeLong()); } patterns.put(randomAlphaOfLength(4), new AutoFollowMetadata.AutoFollowPattern(remoteCluster, leaderIndexPatters, - followIndexNamePattern, maxReadRequestOperationCount, maxWriteRequestOperationCount, maxOutstandingReadRequests, + followIndexNamePattern, active, maxReadRequestOperationCount, maxWriteRequestOperationCount, maxOutstandingReadRequests, maxOutstandingWriteRequests, maxReadRequestSize, maxWriteRequestSize, maxWriteBufferCount, maxWriteBufferSize, maxRetryDelay, readPollTimeout)); } diff --git a/docs/reference/ccr/apis/auto-follow/get-auto-follow-pattern.asciidoc b/docs/reference/ccr/apis/auto-follow/get-auto-follow-pattern.asciidoc index ac8f9e4994139..5ea23782e1967 100644 --- a/docs/reference/ccr/apis/auto-follow/get-auto-follow-pattern.asciidoc +++ b/docs/reference/ccr/apis/auto-follow/get-auto-follow-pattern.asciidoc @@ -90,6 +90,7 @@ The API returns the following result: { "name": "my_auto_follow_pattern", "pattern": { + "active": true, "remote_cluster" : "remote_cluster", "leader_index_patterns" : [ diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml index ebf9176c30a91..db54e2c38c7cb 100644 --- a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/auto_follow.yml @@ -52,3 +52,89 @@ catch: missing ccr.get_auto_follow_pattern: name: my_pattern + +--- +"Test pause and resume auto follow pattern": + - skip: + version: " - 7.4.99" + reason: "pause/resume auto-follow patterns is supported since 7.5.0" + + - do: + cluster.state: {} + + - set: {master_node: master} + + - do: + nodes.info: {} + + - set: {nodes.$master.transport_address: local_ip} + + - do: + cluster.put_settings: + body: + transient: + cluster.remote.local.seeds: $local_ip + flat_settings: true + + - match: {transient: {cluster.remote.local.seeds: $local_ip}} + + - do: + ccr.put_auto_follow_pattern: + name: pattern_test + body: + remote_cluster: local + leader_index_patterns: ['logs-*'] + max_outstanding_read_requests: 2 + - is_true: acknowledged + + - do: + ccr.get_auto_follow_pattern: + name: pattern_test + - match: { patterns.0.name: 'pattern_test' } + - match: { patterns.0.pattern.remote_cluster: 'local' } + - match: { patterns.0.pattern.leader_index_patterns: ['logs-*'] } + - match: { patterns.0.pattern.max_outstanding_read_requests: 2 } + - match: { patterns.0.pattern.active: true } + + - do: + catch: missing + ccr.pause_auto_follow_pattern: + name: unknown_pattern + + - do: + ccr.pause_auto_follow_pattern: + name: pattern_test + - is_true: acknowledged + + - do: + ccr.get_auto_follow_pattern: + name: pattern_test + - match: { patterns.0.name: 'pattern_test' } + - match: { patterns.0.pattern.remote_cluster: 'local' } + - match: { patterns.0.pattern.leader_index_patterns: ['logs-*'] } + - match: { patterns.0.pattern.max_outstanding_read_requests: 2 } + - match: { patterns.0.pattern.active: false } + + - do: + catch: missing + ccr.resume_auto_follow_pattern: + name: unknown_pattern + + - do: + ccr.resume_auto_follow_pattern: + name: pattern_test + - is_true: acknowledged + + - do: + ccr.get_auto_follow_pattern: + name: pattern_test + - match: { patterns.0.name: 'pattern_test' } + - match: { patterns.0.pattern.remote_cluster: 'local' } + - match: { patterns.0.pattern.leader_index_patterns: ['logs-*'] } + - match: { patterns.0.pattern.max_outstanding_read_requests: 2 } + - match: { patterns.0.pattern.active: true } + + - do: + ccr.delete_auto_follow_pattern: + name: pattern_test + - is_true: acknowledged diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index cff4338386e3a..de6f03bddc58c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -61,6 +61,7 @@ import org.elasticsearch.xpack.ccr.action.TransportFollowStatsAction; import org.elasticsearch.xpack.ccr.action.TransportForgetFollowerAction; import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction; +import org.elasticsearch.xpack.ccr.action.TransportActivateAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction; import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportPutFollowAction; @@ -82,9 +83,11 @@ import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction; import org.elasticsearch.xpack.ccr.rest.RestForgetFollowerAction; import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction; +import org.elasticsearch.xpack.ccr.rest.RestPauseAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction; import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestPutFollowAction; +import org.elasticsearch.xpack.ccr.rest.RestResumeAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestResumeFollowAction; import org.elasticsearch.xpack.ccr.rest.RestUnfollowAction; import org.elasticsearch.xpack.core.XPackPlugin; @@ -97,6 +100,7 @@ import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction; import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; @@ -236,6 +240,7 @@ public List> getPersistentTasksExecutor(ClusterServic new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class), new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class), new ActionHandler<>(GetAutoFollowPatternAction.INSTANCE, TransportGetAutoFollowPatternAction.class), + new ActionHandler<>(ActivateAutoFollowPatternAction.INSTANCE, TransportActivateAutoFollowPatternAction.class), // forget follower action new ActionHandler<>(ForgetFollowerAction.INSTANCE, TransportForgetFollowerAction.class)); } @@ -262,6 +267,8 @@ public List getRestHandlers(Settings settings, RestController restC new RestDeleteAutoFollowPatternAction(restController), new RestPutAutoFollowPatternAction(restController), new RestGetAutoFollowPatternAction(restController), + new RestPauseAutoFollowPatternAction(restController), + new RestResumeAutoFollowPatternAction(restController), // forget follower API new RestForgetFollowerAction(restController)); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index cca60579ae6f9..4129b4c018a6d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -194,7 +194,7 @@ synchronized void updateStats(List results) { } void updateAutoFollowers(ClusterState followerClusterState) { - AutoFollowMetadata autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); + final AutoFollowMetadata autoFollowMetadata = followerClusterState.getMetaData().custom(AutoFollowMetadata.TYPE); if (autoFollowMetadata == null) { return; } @@ -206,8 +206,9 @@ void updateAutoFollowers(ClusterState followerClusterState) { } final CopyOnWriteHashMap autoFollowers = CopyOnWriteHashMap.copyOf(this.autoFollowers); - Set newRemoteClusters = autoFollowMetadata.getPatterns().entrySet().stream() - .map(entry -> entry.getValue().getRemoteCluster()) + Set newRemoteClusters = autoFollowMetadata.getPatterns().values().stream() + .filter(AutoFollowPattern::isActive) + .map(AutoFollowPattern::getRemoteCluster) .filter(remoteCluster -> autoFollowers.containsKey(remoteCluster) == false) .collect(Collectors.toSet()); @@ -283,6 +284,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS String remoteCluster = entry.getKey(); AutoFollower autoFollower = entry.getValue(); boolean exist = autoFollowMetadata.getPatterns().values().stream() + .filter(AutoFollowPattern::isActive) .anyMatch(pattern -> pattern.getRemoteCluster().equals(remoteCluster)); if (exist == false) { LOGGER.info("removing auto-follower for remote cluster [{}]", remoteCluster); @@ -345,6 +347,7 @@ abstract static class AutoFollower { private volatile CountDown autoFollowPatternsCountDown; private volatile AtomicArray autoFollowResults; private volatile boolean stop; + private volatile List lastActivePatterns = Collections.emptyList(); AutoFollower(final String remoteCluster, final Consumer> statsUpdater, @@ -384,7 +387,9 @@ void start() { final List patterns = autoFollowMetadata.getPatterns().entrySet().stream() .filter(entry -> entry.getValue().getRemoteCluster().equals(remoteCluster)) + .filter(entry -> entry.getValue().isActive()) .map(Map.Entry::getKey) + .sorted() .collect(Collectors.toList()); if (patterns.isEmpty()) { LOGGER.info("AutoFollower for cluster [{}] has stopped, because there are no more patterns", remoteCluster); @@ -394,8 +399,15 @@ void start() { this.autoFollowPatternsCountDown = new CountDown(patterns.size()); this.autoFollowResults = new AtomicArray<>(patterns.size()); + // keep the list of the last known active patterns for this auto-follower + // if the list changed, we explicitly retrieve the last cluster state in + // order to avoid timeouts when waiting for the next remote cluster state + // version that might never arrive + final long nextMetadataVersion = Objects.equals(patterns, lastActivePatterns) ? metadataVersion + 1 : metadataVersion; + this.lastActivePatterns = Collections.unmodifiableList(patterns); + final Thread thread = Thread.currentThread(); - getRemoteClusterState(remoteCluster, metadataVersion + 1, (remoteClusterStateResponse, remoteError) -> { + getRemoteClusterState(remoteCluster, Math.max(1L, nextMetadataVersion), (remoteClusterStateResponse, remoteError) -> { // Also check removed flag here, as it may take a while for this remote cluster state api call to return: if (removed) { LOGGER.info("AutoFollower instance for cluster [{}] has been removed", remoteCluster); @@ -445,8 +457,7 @@ private void autoFollowIndices(final AutoFollowMetadata autoFollowMetadata, Map headers = autoFollowMetadata.getHeaders().get(autoFollowPatternName); List followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPatternName); - final List leaderIndicesToFollow = - getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState, followedIndices); + final List leaderIndicesToFollow = getLeaderIndicesToFollow(autoFollowPattern, remoteClusterState, followedIndices); if (leaderIndicesToFollow.isEmpty()) { finalise(slot, new AutoFollowResult(autoFollowPatternName), thread); } else { @@ -599,7 +610,7 @@ static List getLeaderIndicesToFollow(AutoFollowPattern autoFollowPattern, if (leaderIndexMetaData.getState() != IndexMetaData.State.OPEN) { continue; } - if (autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) { + if (autoFollowPattern.isActive() && autoFollowPattern.match(leaderIndexMetaData.getIndex().getName())) { IndexRoutingTable indexRoutingTable = remoteClusterState.routingTable().index(leaderIndexMetaData.getIndex()); if (indexRoutingTable != null && // Leader indices can be in the cluster state, but not all primary shards may be ready yet. diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternAction.java new file mode 100644 index 0000000000000..30e2d7854e34f --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternAction.java @@ -0,0 +1,116 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.Request; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class TransportActivateAutoFollowPatternAction extends TransportMasterNodeAction { + + @Inject + public TransportActivateAutoFollowPatternAction(TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, + IndexNameExpressionResolver resolver) { + super(ActivateAutoFollowPatternAction.NAME, transportService, clusterService, threadPool, actionFilters, Request::new, resolver); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected AcknowledgedResponse read(final StreamInput in) throws IOException { + return new AcknowledgedResponse(in); + } + + @Override + protected ClusterBlockException checkBlock(final Request request, final ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + @Override + protected void masterOperation(ActivateAutoFollowPatternAction.Request request, + ClusterState state, + ActionListener listener) throws Exception { + clusterService.submitStateUpdateTask("activate-auto-follow-pattern-" + request.getName(), + new AckedClusterStateUpdateTask(request, listener) { + + @Override + protected AcknowledgedResponse newResponse(final boolean acknowledged) { + return new AcknowledgedResponse(acknowledged); + } + + @Override + public ClusterState execute(final ClusterState currentState) throws Exception { + return innerActivate(request, currentState); + } + }); + } + + static ClusterState innerActivate(final Request request, ClusterState currentState) { + final AutoFollowMetadata autoFollowMetadata = currentState.metaData().custom(AutoFollowMetadata.TYPE); + if (autoFollowMetadata == null) { + throw new ResourceNotFoundException("auto-follow pattern [{}] is missing", request.getName()); + } + + final Map patterns = autoFollowMetadata.getPatterns(); + final AutoFollowMetadata.AutoFollowPattern previousAutoFollowPattern = patterns.get(request.getName()); + if (previousAutoFollowPattern == null) { + throw new ResourceNotFoundException("auto-follow pattern [{}] is missing", request.getName()); + } + + if (previousAutoFollowPattern.isActive() == request.isActive()) { + return currentState; + } + + final Map newPatterns = new HashMap<>(patterns); + newPatterns.put(request.getName(), + new AutoFollowMetadata.AutoFollowPattern( + previousAutoFollowPattern.getRemoteCluster(), + previousAutoFollowPattern.getLeaderIndexPatterns(), + previousAutoFollowPattern.getFollowIndexPattern(), + request.isActive(), + previousAutoFollowPattern.getMaxReadRequestOperationCount(), + previousAutoFollowPattern.getMaxWriteRequestOperationCount(), + previousAutoFollowPattern.getMaxOutstandingReadRequests(), + previousAutoFollowPattern.getMaxOutstandingWriteRequests(), + previousAutoFollowPattern.getMaxReadRequestSize(), + previousAutoFollowPattern.getMaxWriteRequestSize(), + previousAutoFollowPattern.getMaxWriteBufferCount(), + previousAutoFollowPattern.getMaxWriteBufferSize(), + previousAutoFollowPattern.getMaxRetryDelay(), + previousAutoFollowPattern.getReadPollTimeout())); + + return ClusterState.builder(currentState) + .metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(newPatterns, autoFollowMetadata.getFollowedLeaderIndexUUIDs(), autoFollowMetadata.getHeaders())) + .build()) + .build(); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java index deb39a97d1c6f..7784f3fe7b052 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java @@ -160,6 +160,7 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request, request.getRemoteCluster(), request.getLeaderIndexPatterns(), request.getFollowIndexNamePattern(), + true, request.getParameters().getMaxReadRequestOperationCount(), request.getParameters().getMaxWriteRequestOperationCount(), request.getParameters().getMaxOutstandingReadRequests(), diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPauseAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPauseAutoFollowPatternAction.java new file mode 100644 index 0000000000000..abfca00da5cb1 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestPauseAutoFollowPatternAction.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.Request; + +import static org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.INSTANCE; + +public class RestPauseAutoFollowPatternAction extends BaseRestHandler { + + public RestPauseAutoFollowPatternAction(final RestController controller) { + controller.registerHandler(RestRequest.Method.POST, "/_ccr/auto_follow/{name}/pause", this); + } + + @Override + public String getName() { + return "ccr_pause_auto_follow_pattern_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) { + Request request = new Request(restRequest.param("name"), false); + return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestResumeAutoFollowPatternAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestResumeAutoFollowPatternAction.java new file mode 100644 index 0000000000000..89f3f65fca7d3 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestResumeAutoFollowPatternAction.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.Request; + +import static org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.INSTANCE; + +public class RestResumeAutoFollowPatternAction extends BaseRestHandler { + + public RestResumeAutoFollowPatternAction(final RestController controller) { + controller.registerHandler(RestRequest.Method.POST, "/_ccr/auto_follow/{name}/resume", this); + } + + @Override + public String getName() { + return "ccr_resume_auto_follow_pattern_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) { + Request request = new Request(restRequest.param("name"), true); + return channel -> client.execute(INSTANCE, request, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 0bcb3daac6284..f030b99d0a010 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -9,6 +9,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Strings; @@ -16,25 +17,33 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowStats; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction; -import org.elasticsearch.xpack.core.ccr.action.FollowParameters; import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo; +import org.elasticsearch.xpack.core.ccr.action.FollowParameters; +import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Locale; +import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.notNullValue; @@ -392,6 +401,164 @@ public void testAutoFollowSoftDeletesDisabled() throws Exception { }); } + public void testPauseAndResumeAutoFollowPattern() throws Exception { + final Settings leaderIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .build(); + + // index created in the remote cluster before the auto follow pattern exists won't be auto followed + createLeaderIndex("test-existing-index-is-ignored", leaderIndexSettings); + + // create the auto follow pattern + putAutoFollowPatterns("test-pattern", new String[]{"test-*", "tests-*"}); + assertBusy(() -> { + final AutoFollowStats autoFollowStats = getAutoFollowStats(); + assertThat(autoFollowStats.getAutoFollowedClusters().size(), equalTo(1)); + assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(0L)); + }); + + // index created in the remote cluster are auto followed + createLeaderIndex("test-new-index-is-auto-followed", leaderIndexSettings); + assertBusy(() -> { + final AutoFollowStats autoFollowStats = getAutoFollowStats(); + assertThat(autoFollowStats.getAutoFollowedClusters().size(), equalTo(1)); + assertThat(autoFollowStats.getNumberOfSuccessfulFollowIndices(), equalTo(1L)); + IndicesExistsRequest request = new IndicesExistsRequest("copy-test-new-index-is-auto-followed"); + assertTrue(followerClient().admin().indices().exists(request).actionGet().isExists()); + }); + ensureFollowerGreen("copy-test-new-index-is-auto-followed"); + + // pause the auto follow pattern + pauseAutoFollowPattern("test-pattern"); + assertBusy(() -> assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(0))); + + // indices created in the remote cluster are not auto followed because the pattern is paused + final int nbIndicesCreatedWhilePaused = randomIntBetween(1, 5); + for (int i = 0; i < nbIndicesCreatedWhilePaused; i++) { + createLeaderIndex("test-index-created-while-pattern-is-paused-" + i, leaderIndexSettings); + } + + // sometimes create another index in the remote cluster and close (or delete) it right away + // it should not be auto followed when the pattern is resumed + if (randomBoolean()) { + final String indexName = "test-index-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT); + createLeaderIndex(indexName, leaderIndexSettings); + if (randomBoolean()) { + assertAcked(leaderClient().admin().indices().prepareClose(indexName)); + } else { + assertAcked(leaderClient().admin().indices().prepareDelete(indexName)); + } + } + + if (randomBoolean()) { + createLeaderIndex("logs-20200101", leaderIndexSettings); + } + + // pattern is paused, none of the newly created indices has been followed yet + assertThat(followerClient().admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(1)); + ensureLeaderGreen("test-index-created-while-pattern-is-paused-*"); + + // resume the auto follow pattern, indices created while the pattern was paused are picked up for auto-following + resumeAutoFollowPattern("test-pattern"); + assertBusy(() -> { + final Client client = followerClient(); + assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(1)); + assertThat(client.admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(1 + nbIndicesCreatedWhilePaused)); + for (int i = 0; i < nbIndicesCreatedWhilePaused; i++) { + IndicesExistsRequest request = new IndicesExistsRequest("copy-test-index-created-while-pattern-is-paused-" + i); + assertTrue(followerClient().admin().indices().exists(request).actionGet().isExists()); + } + }); + } + + public void testPauseAndResumeWithMultipleAutoFollowPatterns() throws Exception { + final Settings leaderIndexSettings = Settings.builder() + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .build(); + + final String[] prefixes = {"logs-", "users-", "docs-", "monitoring-", "data-", "system-", "events-", "files-"}; + if (randomBoolean()) { + // sometimes create indices in the remote cluster that match the future auto follow patterns + Arrays.stream(prefixes).forEach(prefix -> createLeaderIndex(prefix + "ignored", leaderIndexSettings)); + } + + // create auto follow patterns + final List autoFollowPatterns = new ArrayList<>(prefixes.length); + for (String prefix : prefixes) { + String name = prefix + "pattern"; + putAutoFollowPatterns(name, new String[]{prefix + "*"}); + autoFollowPatterns.add(name); + assertBusy(() -> assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(1))); + assertTrue(getAutoFollowPattern(name).isActive()); + } + + // no following indices are created yet + assertThat(followerClient().admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(0)); + + // create random indices in the remote cluster that match the patterns + final AtomicBoolean running = new AtomicBoolean(true); + final Set leaderIndices = ConcurrentCollections.newConcurrentSet(); + final Thread createNewLeaderIndicesThread = new Thread(() -> { + while (running.get()) { + try { + String indexName = randomFrom(prefixes) + randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createLeaderIndex(indexName, leaderIndexSettings); + leaderIndices.add(indexName); + Thread.sleep(randomIntBetween(100, 500)); + } catch (Exception e) { + throw new AssertionError(e); + } + } + }); + createNewLeaderIndicesThread.start(); + + // wait for some leader indices to be auto-followed + assertBusy(() -> + assertThat(getAutoFollowStats().getNumberOfSuccessfulFollowIndices(), greaterThanOrEqualTo((long) prefixes.length))); + + final int nbLeaderIndices = leaderIndices.size(); + + // pause some random patterns + final List pausedAutoFollowerPatterns = randomSubsetOf(autoFollowPatterns); + pausedAutoFollowerPatterns.forEach(this::pauseAutoFollowPattern); + assertBusy(() -> pausedAutoFollowerPatterns.forEach(pattern -> assertFalse(getAutoFollowPattern(pattern).isActive()))); + + assertBusy(() -> { + final int expectedAutoFollowedClusters = pausedAutoFollowerPatterns.size() != autoFollowPatterns.size() ? 1 : 0; + assertThat(getAutoFollowStats().getAutoFollowedClusters().size(), equalTo(expectedAutoFollowedClusters)); + if (expectedAutoFollowedClusters > 0) { + // wait for more indices to be created in the remote cluster while some patterns are paused + assertThat(leaderIndices.size(), greaterThan(nbLeaderIndices + 3)); + } + }); + ensureFollowerGreen(true, "copy-*"); + + // resume auto follow patterns + pausedAutoFollowerPatterns.forEach(this::resumeAutoFollowPattern); + assertBusy(() -> pausedAutoFollowerPatterns.forEach(pattern -> assertTrue(getAutoFollowPattern(pattern).isActive()))); + + // stop creating indices in the remote cluster + running.set(false); + createNewLeaderIndicesThread.join(); + + ensureLeaderGreen(leaderIndices.toArray(new String[0])); + + // check that all leader indices have been correctly auto followed + assertBusy(() -> { + final Client client = followerClient(); + assertThat(client.admin().indices().prepareStats("copy-*").get().getIndices().size(), equalTo(leaderIndices.size())); + leaderIndices.stream() + .map(leaderIndex -> "copy-" + leaderIndex) + .forEach(followerIndex -> + assertTrue("following index must exist: " + followerIndex, + client.admin().indices().exists(new IndicesExistsRequest(followerIndex)).actionGet().isExists())); + }); + } + private void putAutoFollowPatterns(String name, String[] patterns) { PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); request.setName(name); @@ -418,4 +585,21 @@ private void createLeaderIndex(String index, Settings settings) { leaderClient().admin().indices().create(request).actionGet(); } + private void pauseAutoFollowPattern(final String name) { + ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request(name, false); + assertAcked(followerClient().execute(ActivateAutoFollowPatternAction.INSTANCE, request).actionGet()); + } + + private void resumeAutoFollowPattern(final String name) { + ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request(name, true); + assertAcked(followerClient().execute(ActivateAutoFollowPatternAction.INSTANCE, request).actionGet()); + } + + private AutoFollowMetadata.AutoFollowPattern getAutoFollowPattern(final String name) { + GetAutoFollowPatternAction.Request request = new GetAutoFollowPatternAction.Request(); + request.setName(name); + GetAutoFollowPatternAction.Response response = followerClient().execute(GetAutoFollowPatternAction.INSTANCE, request).actionGet(); + assertTrue(response.getAutoFollowPatterns().containsKey(name)); + return response.getAutoFollowPatterns().get(name); + } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java index 26182781233e2..32b4b3ed9d174 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowMetadataTests.java @@ -44,7 +44,7 @@ protected AutoFollowMetadata createTestInstance() { randomAlphaOfLength(4), leaderPatterns, randomAlphaOfLength(4), - randomIntBetween(0, Integer.MAX_VALUE), + true, randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE), diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CCRFeatureSetTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CCRFeatureSetTests.java index f2fb475816ec2..01f8234cb8014 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CCRFeatureSetTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CCRFeatureSetTests.java @@ -101,7 +101,7 @@ public void testUsageStats() throws Exception { Map patterns = new HashMap<>(numAutoFollowPatterns); for (int i = 0; i < numAutoFollowPatterns; i++) { AutoFollowMetadata.AutoFollowPattern pattern = new AutoFollowMetadata.AutoFollowPattern("remote_cluser", - Collections.singletonList("logs" + i + "*"), null, null, null, null, null, null, null, null, null, null, null); + Collections.singletonList("logs" + i + "*"), null, true, null, null, null, null, null, null, null, null, null, null); patterns.put("pattern" + i, pattern); } metaData.putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap())); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java index f8e7eab1c8647..13aa3208e5550 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java @@ -165,7 +165,7 @@ public void testAutoFollowCoordinatorLogsSkippingAutoFollowCoordinationWithNonCo @Override public ClusterState execute(ClusterState currentState) throws Exception { AutoFollowPattern autoFollowPattern = new AutoFollowPattern("test_alias", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata( Collections.singletonMap("test_alias", autoFollowPattern), Collections.emptyMap(), diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ActivateAutoFollowPatternActionRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ActivateAutoFollowPatternActionRequestTests.java new file mode 100644 index 0000000000000..961bf94f658ac --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ActivateAutoFollowPatternActionRequestTests.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +public class ActivateAutoFollowPatternActionRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected ActivateAutoFollowPatternAction.Request createTestInstance() { + return new ActivateAutoFollowPatternAction.Request(randomAlphaOfLength(5), randomBoolean()); + } + + @Override + protected Writeable.Reader instanceReader() { + return ActivateAutoFollowPatternAction.Request::new; + } + + public void testValidate() { + ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request(null, true); + ActionRequestValidationException validationException = request.validate(); + assertThat(validationException, notNullValue()); + assertThat(validationException.getMessage(), containsString("[name] is missing")); + + request = new ActivateAutoFollowPatternAction.Request("name", true); + validationException = request.validate(); + assertThat(validationException, nullValue()); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java index e3edf0489d745..ba2afd788f682 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java @@ -38,6 +38,8 @@ import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata.AutoFollowPattern; import org.elasticsearch.xpack.core.ccr.AutoFollowStats; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction; +import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import java.util.ArrayList; @@ -52,6 +54,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -59,8 +62,12 @@ import java.util.function.Function; import java.util.function.Supplier; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.cleanFollowedRemoteIndices; import static org.elasticsearch.xpack.ccr.action.AutoFollowCoordinator.AutoFollower.recordLeaderIndexAsFollowFunction; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasItem; @@ -68,6 +75,7 @@ import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; +import static org.hamcrest.Matchers.startsWith; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -81,7 +89,7 @@ public void testAutoFollower() { ClusterState remoteState = createRemoteClusterState("logs-20190101", true); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -150,7 +158,7 @@ public void testAutoFollowerClusterStateApiFailure() { when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -202,7 +210,7 @@ public void testAutoFollowerUpdateClusterStateFailure() { ClusterState remoteState = createRemoteClusterState("logs-20190101", true); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -253,13 +261,232 @@ void updateAutoFollowMetadata(Function updateFunctio assertThat(invoked[0], is(true)); } + public void testAutoFollowerWithNoActivePatternsDoesNotStart() { + final String remoteCluster = randomAlphaOfLength(5); + + final Map autoFollowPatterns = new HashMap<>(2); + autoFollowPatterns.put("pattern_1", new AutoFollowPattern(remoteCluster, Arrays.asList("logs-*", "test-*"), "copy-", false, + null, null, null, null, null, null, null, null, null, null)); + autoFollowPatterns.put("pattern_2", new AutoFollowPattern(remoteCluster, Arrays.asList("users-*"), "copy-", false, null, null, + null, null, null, null, null, null, null, null)); + + final Map> followedLeaderIndexUUIDs = new HashMap<>(2); + followedLeaderIndexUUIDs.put("pattern_1", Arrays.asList("uuid1", "uuid2")); + followedLeaderIndexUUIDs.put("pattern_2", Collections.emptyList()); + + final Map> headers = new HashMap<>(2); + headers.put("pattern_1", singletonMap("header", "value")); + headers.put("pattern_2", emptyMap()); + + final Supplier followerClusterStateSupplier = localClusterStateSupplier(ClusterState.builder(new ClusterName("test")) + .metaData(MetaData.builder() + .putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(autoFollowPatterns, followedLeaderIndexUUIDs, headers)) + .build()) + .build()); + + final AtomicBoolean invoked = new AtomicBoolean(false); + final AutoFollower autoFollower = + new AutoFollower(remoteCluster, v -> invoked.set(true), followerClusterStateSupplier, () -> 1L, Runnable::run) { + @Override + void getRemoteClusterState(String remote, long metadataVersion, BiConsumer handler) { + invoked.set(true); + } + + @Override + void createAndFollow(Map headers, PutFollowAction.Request request, + Runnable successHandler, Consumer failureHandler) { + invoked.set(true); + successHandler.run(); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + invoked.set(true); + } + }; + + autoFollower.start(); + assertThat(invoked.get(), is(false)); + } + + public void testAutoFollowerWithPausedActivePatterns() { + final String remoteCluster = randomAlphaOfLength(5); + + final AtomicReference remoteClusterState = new AtomicReference<>( + createRemoteClusterState("patternLogs-0", true, randomLongBetween(1L, 1_000L)) + ); + + final AtomicReference localClusterState = new AtomicReference<>( + ClusterState.builder(new ClusterName("local")) + .metaData(MetaData.builder() + .putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(emptyMap(), emptyMap(), emptyMap()))) + .build() + ); + + // compute and return the local cluster state, updated with some auto-follow patterns + final Supplier localClusterStateSupplier = () -> localClusterState.updateAndGet(currentLocalState -> { + final int nextClusterStateVersion = (int) (currentLocalState.version() + 1); + + final ClusterState nextLocalClusterState; + if (nextClusterStateVersion == 1) { + // cluster state #1 : one pattern is active + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setName("patternLogs"); + request.setRemoteCluster(remoteCluster); + request.setLeaderIndexPatterns(singletonList("patternLogs-*")); + request.setFollowIndexNamePattern("copy-{{leader_index}}"); + nextLocalClusterState = + TransportPutAutoFollowPatternAction.innerPut(request, emptyMap(), currentLocalState, remoteClusterState.get()); + + } else if (nextClusterStateVersion == 2) { + // cluster state #2 : still one pattern is active + nextLocalClusterState = currentLocalState; + + } else if (nextClusterStateVersion == 3) { + // cluster state #3 : add a new pattern, two patterns are active + PutAutoFollowPatternAction.Request request = new PutAutoFollowPatternAction.Request(); + request.setName("patternDocs"); + request.setRemoteCluster(remoteCluster); + request.setLeaderIndexPatterns(singletonList("patternDocs-*")); + request.setFollowIndexNamePattern("copy-{{leader_index}}"); + nextLocalClusterState = + TransportPutAutoFollowPatternAction.innerPut(request, emptyMap(), currentLocalState, remoteClusterState.get()); + + } else if (nextClusterStateVersion == 4) { + // cluster state #4 : still both patterns are active + nextLocalClusterState = currentLocalState; + + } else if (nextClusterStateVersion == 5) { + // cluster state #5 : first pattern is paused, second pattern is still active + ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request("patternLogs", false); + nextLocalClusterState = TransportActivateAutoFollowPatternAction.innerActivate(request, currentLocalState); + + } else if (nextClusterStateVersion == 6) { + // cluster state #5 : second pattern is paused, both patterns are inactive + ActivateAutoFollowPatternAction.Request request = new ActivateAutoFollowPatternAction.Request("patternDocs", false); + nextLocalClusterState = TransportActivateAutoFollowPatternAction.innerActivate(request, currentLocalState); + + } else { + return currentLocalState; + } + + return ClusterState.builder(nextLocalClusterState) + .version(nextClusterStateVersion) + .build(); + }); + + final Set followedIndices = ConcurrentCollections.newConcurrentSet(); + final List autoFollowResults = new ArrayList<>(); + + final AutoFollower autoFollower = + new AutoFollower(remoteCluster, autoFollowResults::addAll, localClusterStateSupplier, () -> 1L, Runnable::run) { + + int countFetches = 1; // to be aligned with local cluster state updates + ClusterState lastFetchedRemoteClusterState; + + @Override + void getRemoteClusterState(String remote, long metadataVersion, BiConsumer handler) { + assertThat(remote, equalTo(remoteCluster)); + + // in this test, every time it fetches the remote cluster state new leader indices to follow appears + final String[] newLeaderIndices = {"patternLogs-" + countFetches, "patternDocs-" + countFetches}; + + if (countFetches == 1) { + assertThat("first invocation, it should retrieve the metadata version 1", metadataVersion, equalTo(1L)); + lastFetchedRemoteClusterState = createRemoteClusterState(remoteClusterState.get(), newLeaderIndices); + + } else if (countFetches == 2 || countFetches == 4) { + assertThat("no patterns changes, it should retrieve the last known metadata version + 1", + metadataVersion, equalTo(lastFetchedRemoteClusterState.metaData().version() + 1)); + lastFetchedRemoteClusterState = createRemoteClusterState(remoteClusterState.get(), newLeaderIndices); + assertThat("remote cluster state metadata version is aligned with what the auto-follower is requesting", + lastFetchedRemoteClusterState.getMetaData().version(), equalTo(metadataVersion)); + + } else if (countFetches == 3 || countFetches == 5) { + assertThat("patterns have changed, it should retrieve the last known metadata version again", + metadataVersion, equalTo(lastFetchedRemoteClusterState.metaData().version())); + lastFetchedRemoteClusterState = createRemoteClusterState(remoteClusterState.get(), newLeaderIndices); + assertThat("remote cluster state metadata version is incremented", + lastFetchedRemoteClusterState.getMetaData().version(), equalTo(metadataVersion + 1)); + } else { + fail("after the 5th invocation there are no more active patterns, the auto-follower should have stopped"); + } + + countFetches = countFetches + 1; + remoteClusterState.set(lastFetchedRemoteClusterState); + handler.accept(new ClusterStateResponse(lastFetchedRemoteClusterState.getClusterName(), + lastFetchedRemoteClusterState, false), null); + } + + @Override + void createAndFollow(Map headers, PutFollowAction.Request request, + Runnable successHandler, Consumer failureHandler) { + assertThat(request.getRemoteCluster(), equalTo(remoteCluster)); + assertThat(request.getFollowerIndex(), startsWith("copy-")); + followedIndices.add(request.getLeaderIndex()); + successHandler.run(); + } + + @Override + void updateAutoFollowMetadata(Function updateFunction, Consumer handler) { + localClusterState.updateAndGet(updateFunction::apply); + handler.accept(null); + } + + @Override + void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List patterns) { + // Ignore, to avoid invoking updateAutoFollowMetadata(...) twice + } + }; + + autoFollower.start(); + + assertThat(autoFollowResults.size(), equalTo(7)); + assertThat(followedIndices, containsInAnyOrder( + "patternLogs-1", // iteration #1 : only pattern "patternLogs" is active in local cluster state + "patternLogs-2", // iteration #2 : only pattern "patternLogs" is active in local cluster state + "patternLogs-3", // iteration #3 : both patterns "patternLogs" and "patternDocs" are active in local cluster state + "patternDocs-3", // + "patternLogs-4", // iteration #4 : both patterns "patternLogs" and "patternDocs" are active in local cluster state + "patternDocs-4", // + "patternDocs-5" // iteration #5 : only pattern "patternDocs" is active in local cluster state, "patternLogs" is paused + )); + + final ClusterState finalRemoteClusterState = remoteClusterState.get(); + final ClusterState finalLocalClusterState = localClusterState.get(); + + AutoFollowMetadata autoFollowMetadata = finalLocalClusterState.metaData().custom(AutoFollowMetadata.TYPE); + assertThat(autoFollowMetadata.getPatterns().size(), equalTo(2)); + assertThat(autoFollowMetadata.getPatterns().values().stream().noneMatch(AutoFollowPattern::isActive), is(true)); + + assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("patternLogs"), + containsInAnyOrder( + finalRemoteClusterState.metaData().index("patternLogs-0").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternLogs-1").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternLogs-2").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternLogs-3").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternLogs-4").getIndexUUID() + // patternLogs-5 exists in remote cluster state but patternLogs was paused + )); + + assertThat(autoFollowMetadata.getFollowedLeaderIndexUUIDs().get("patternDocs"), + containsInAnyOrder( + // patternDocs-0 does not exist in remote cluster state + finalRemoteClusterState.metaData().index("patternDocs-1").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternDocs-2").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternDocs-3").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternDocs-4").getIndexUUID(), + finalRemoteClusterState.metaData().index("patternDocs-5").getIndexUUID() + )); + } + public void testAutoFollowerCreateAndFollowApiCallFailure() { Client client = mock(Client.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); ClusterState remoteState = createRemoteClusterState("logs-20190101", true); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -317,13 +544,9 @@ void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List pa } public void testGetLeaderIndicesToFollow() { - AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), - null, null, null, null, null, null, null, null, null, null, null); + AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, true, + null, null, null, null, null, null, null, null, null, null); Map> headers = new HashMap<>(); - ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) - .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, - new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap(), headers))) - .build(); RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); MetaData.Builder imdBuilder = MetaData.builder(); @@ -368,7 +591,7 @@ public void testGetLeaderIndicesToFollow() { assertThat(result.get(3).getName(), equalTo("metrics-3")); assertThat(result.get(4).getName(), equalTo("metrics-4")); - List followedIndexUUIDs = Collections.singletonList(remoteState.metaData().index("metrics-2").getIndexUUID()); + final List followedIndexUUIDs = Collections.singletonList(remoteState.metaData().index("metrics-2").getIndexUUID()); result = AutoFollower.getLeaderIndicesToFollow(autoFollowPattern, remoteState, followedIndexUUIDs); result.sort(Comparator.comparing(Index::getName)); assertThat(result.size(), equalTo(4)); @@ -376,16 +599,20 @@ public void testGetLeaderIndicesToFollow() { assertThat(result.get(1).getName(), equalTo("metrics-1")); assertThat(result.get(2).getName(), equalTo("metrics-3")); assertThat(result.get(3).getName(), equalTo("metrics-4")); + + final AutoFollowPattern inactiveAutoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, + false, null, null, null, null, null, null, null, null, null, null); + + result = AutoFollower.getLeaderIndicesToFollow(inactiveAutoFollowPattern, remoteState, Collections.emptyList()); + assertThat(result.size(), equalTo(0)); + + result = AutoFollower.getLeaderIndicesToFollow(inactiveAutoFollowPattern, remoteState, followedIndexUUIDs); + assertThat(result.size(), equalTo(0)); } public void testGetLeaderIndicesToFollow_shardsNotStarted() { - AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"), - null, null, null, null, null, null, null, null, null, null, null); - Map> headers = new HashMap<>(); - ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) - .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, - new AutoFollowMetadata(Collections.singletonMap("remote", autoFollowPattern), Collections.emptyMap(), headers))) - .build(); + AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"), null, true, + null, null, null, null, null, null, null, null, null, null); // 1 shard started and another not started: ClusterState remoteState = createRemoteClusterState("index1", true); @@ -425,7 +652,7 @@ public void testGetLeaderIndicesToFollow_shardsNotStarted() { public void testGetLeaderIndicesToFollowWithClosedIndices() { final AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); // index is opened ClusterState remoteState = ClusterStateCreationUtils.stateWithActivePrimary("test-index", true, randomIntBetween(1, 3), 0); @@ -552,15 +779,15 @@ public void testCleanFollowedLeaderIndicesNoEntry() { } public void testGetFollowerIndexName() { - AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, null, + AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), null, true, null, null, null, null, null, null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("metrics-0")); - autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-metrics-0", null, null, + autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-metrics-0", true, null, null, null, null, null, null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); - autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-{{leader_index}}", null, + autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("metrics-*"), "eu-{{leader_index}}", true, null, null, null, null, null, null, null, null, null, null); assertThat(AutoFollower.getFollowerIndexName(autoFollowPattern, "metrics-0"), equalTo("eu-metrics-0")); } @@ -643,11 +870,11 @@ public void testUpdateAutoFollowers() { Runnable::run); // Add 3 patterns: Map patterns = new HashMap<>(); - patterns.put("pattern1", new AutoFollowPattern("remote1", Collections.singletonList("logs-*"), null, null, null, + patterns.put("pattern1", new AutoFollowPattern("remote1", Collections.singletonList("logs-*"), null, true, null, null, null, null, null, null, null, null, null, null)); - patterns.put("pattern2", new AutoFollowPattern("remote2", Collections.singletonList("logs-*"), null, null, null, + patterns.put("pattern2", new AutoFollowPattern("remote2", Collections.singletonList("logs-*"), null, true, null, null, null, null, null, null, null, null, null, null)); - patterns.put("pattern3", new AutoFollowPattern("remote2", Collections.singletonList("metrics-*"), null, null, null, + patterns.put("pattern3", new AutoFollowPattern("remote2", Collections.singletonList("metrics-*"), null, true, null, null, null, null, null, null, null, null, null, null)); ClusterState clusterState = ClusterState.builder(new ClusterName("remote")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, @@ -673,7 +900,7 @@ public void testUpdateAutoFollowers() { assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote2"), notNullValue()); assertThat(removedAutoFollower1.removed, is(true)); // Add pattern 4: - patterns.put("pattern4", new AutoFollowPattern("remote1", Collections.singletonList("metrics-*"), null, null, null, + patterns.put("pattern4", new AutoFollowPattern("remote1", Collections.singletonList("metrics-*"), null, true, null, null, null, null, null, null, null, null, null, null)); clusterState = ClusterState.builder(new ClusterName("remote")) .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, @@ -733,12 +960,100 @@ public void testUpdateAutoFollowersNoAutoFollowMetadata() { assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(0)); } + public void testUpdateAutoFollowersNoActivePatterns() { + final ClusterService clusterService = mockClusterService(); + final AutoFollowCoordinator autoFollowCoordinator = new AutoFollowCoordinator( + Settings.EMPTY, + null, + clusterService, + new CcrLicenseChecker(() -> true, () -> false), + () -> 1L, + () -> 1L, + Runnable::run); + + autoFollowCoordinator.updateAutoFollowers(ClusterState.EMPTY_STATE); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(0)); + + // Add 3 patterns: + Map patterns = new HashMap<>(); + patterns.put("pattern1", new AutoFollowPattern("remote1", Collections.singletonList("logs-*"), null, true, null, null, + null, null, null, null, null, null, null, null)); + patterns.put("pattern2", new AutoFollowPattern("remote2", Collections.singletonList("logs-*"), null, true, null, null, + null, null, null, null, null, null, null, null)); + patterns.put("pattern3", new AutoFollowPattern("remote2", Collections.singletonList("metrics-*"), null, true, null, null, + null, null, null, null, null, null, null, null)); + + autoFollowCoordinator.updateAutoFollowers(ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build()); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(2)); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote1"), notNullValue()); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote2"), notNullValue()); + + AutoFollowCoordinator.AutoFollower removedAutoFollower1 = autoFollowCoordinator.getAutoFollowers().get("remote1"); + assertThat(removedAutoFollower1.removed, is(false)); + AutoFollowCoordinator.AutoFollower removedAutoFollower2 = autoFollowCoordinator.getAutoFollowers().get("remote2"); + assertThat(removedAutoFollower2.removed, is(false)); + + // Make pattern 1 and pattern 3 inactive + patterns.computeIfPresent("pattern1", (name, pattern) -> new AutoFollowPattern(pattern.getRemoteCluster(), + pattern.getLeaderIndexPatterns(), pattern.getFollowIndexPattern(), false, pattern.getMaxReadRequestOperationCount(), + pattern.getMaxWriteRequestOperationCount(), pattern.getMaxOutstandingReadRequests(), pattern.getMaxOutstandingWriteRequests(), + pattern.getMaxReadRequestSize(), pattern.getMaxWriteRequestSize(), pattern.getMaxWriteBufferCount(), + pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(), pattern.getReadPollTimeout())); + patterns.computeIfPresent("pattern3", (name, pattern) -> new AutoFollowPattern(pattern.getRemoteCluster(), + pattern.getLeaderIndexPatterns(), pattern.getFollowIndexPattern(), false, pattern.getMaxReadRequestOperationCount(), + pattern.getMaxWriteRequestOperationCount(), pattern.getMaxOutstandingReadRequests(), pattern.getMaxOutstandingWriteRequests(), + pattern.getMaxReadRequestSize(), pattern.getMaxWriteRequestSize(), pattern.getMaxWriteBufferCount(), + pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(), pattern.getReadPollTimeout())); + + autoFollowCoordinator.updateAutoFollowers(ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build()); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(1)); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote2"), notNullValue()); + assertThat(removedAutoFollower1.removed, is(true)); + assertThat(removedAutoFollower2.removed, is(false)); + + // Add active pattern 4 and make pattern 2 inactive + patterns.put("pattern4", new AutoFollowPattern("remote1", Collections.singletonList("metrics-*"), null, true, null, null, + null, null, null, null, null, null, null, null)); + patterns.computeIfPresent("pattern2", (name, pattern) -> new AutoFollowPattern(pattern.getRemoteCluster(), + pattern.getLeaderIndexPatterns(), pattern.getFollowIndexPattern(), false, pattern.getMaxReadRequestOperationCount(), + pattern.getMaxWriteRequestOperationCount(), pattern.getMaxOutstandingReadRequests(), pattern.getMaxOutstandingWriteRequests(), + pattern.getMaxReadRequestSize(), pattern.getMaxWriteRequestSize(), pattern.getMaxWriteBufferCount(), + pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(), pattern.getReadPollTimeout())); + + autoFollowCoordinator.updateAutoFollowers(ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap()))) + .build()); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(1)); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().get("remote1"), notNullValue()); + + AutoFollowCoordinator.AutoFollower removedAutoFollower4 = autoFollowCoordinator.getAutoFollowers().get("remote1"); + assertThat(removedAutoFollower4.removed, is(false)); + assertNotSame(removedAutoFollower4, removedAutoFollower1); + assertThat(removedAutoFollower2.removed, is(true)); + + autoFollowCoordinator.updateAutoFollowers(ClusterState.builder(new ClusterName("remote")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata(Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap()))) + .build()); + assertThat(autoFollowCoordinator.getStats().getAutoFollowedClusters().size(), equalTo(0)); + assertThat(removedAutoFollower1.removed, is(true)); + assertThat(removedAutoFollower2.removed, is(true)); + assertThat(removedAutoFollower4.removed, is(true)); + } + public void testWaitForMetadataVersion() { Client client = mock(Client.class); when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -801,7 +1116,7 @@ public void testWaitForTimeOut() { when(client.getRemoteClusterClient(anyString())).thenReturn(client); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -860,7 +1175,7 @@ public void testAutoFollowerSoftDeletesDisabled() { createRemoteClusterState("logs-20190101", null); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -926,7 +1241,7 @@ public void testAutoFollowerFollowerIndexAlreadyExists() { ClusterState remoteState = createRemoteClusterState("logs-20190101", true); AutoFollowPattern autoFollowPattern = new AutoFollowPattern("remote", Collections.singletonList("logs-*"), - null, null, null, null, null, null, null, null, null, null, null); + null, true, null, null, null, null, null, null, null, null, null, null); Map patterns = new HashMap<>(); patterns.put("remote", autoFollowPattern); Map> followedLeaderIndexUUIDS = new HashMap<>(); @@ -1012,7 +1327,7 @@ public void testRepeatedFailures() throws InterruptedException { "remote", Collections.singletonList("*"), "{}", - 0, + true, 0, 0, 0, 0, @@ -1087,8 +1402,8 @@ public void testClosedIndicesAreNotAutoFollowed() { .metaData(MetaData.builder() .putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(Collections.singletonMap(pattern, - new AutoFollowPattern("remote", Collections.singletonList("docs-*"), null, null, null, null, null, null, null, null, - null, null, null)), + new AutoFollowPattern("remote", Collections.singletonList("docs-*"), null, true, + null, null, null, null, null, null, null, null, null, null)), Collections.singletonMap(pattern, Collections.emptyList()), Collections.singletonMap(pattern, Collections.emptyMap())))) .build(); @@ -1160,6 +1475,10 @@ void cleanFollowedRemoteIndices(ClusterState remoteClusterState, List pa } private static ClusterState createRemoteClusterState(String indexName, Boolean enableSoftDeletes) { + return createRemoteClusterState(indexName, enableSoftDeletes, 0L); + } + + private static ClusterState createRemoteClusterState(String indexName, Boolean enableSoftDeletes, long metadataVersion) { Settings.Builder indexSettings; if (enableSoftDeletes != null) { indexSettings = settings(Version.CURRENT).put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), enableSoftDeletes); @@ -1173,7 +1492,9 @@ private static ClusterState createRemoteClusterState(String indexName, Boolean e .numberOfReplicas(0) .build(); ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("remote")) - .metaData(MetaData.builder().put(indexMetaData, true)); + .metaData(MetaData.builder() + .put(indexMetaData, true) + .version(metadataVersion)); ShardRouting shardRouting = TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted(); @@ -1183,25 +1504,29 @@ private static ClusterState createRemoteClusterState(String indexName, Boolean e return csBuilder.build(); } - private static ClusterState createRemoteClusterState(ClusterState previous, String indexName) { - IndexMetaData indexMetaData = IndexMetaData.builder(indexName) - .settings(settings(Version.CURRENT) - .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) - .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))) - .numberOfShards(1) - .numberOfReplicas(0) + private static ClusterState createRemoteClusterState(final ClusterState previous, final String... indices) { + if (indices == null) { + return previous; + } + final MetaData.Builder metadataBuilder = MetaData.builder(previous.metaData()).version(previous.metaData().version() + 1); + final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(previous.routingTable()); + for (String indexName : indices) { + IndexMetaData indexMetaData = IndexMetaData.builder(indexName) + .settings(settings(Version.CURRENT) + .put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true) + .put(IndexMetaData.SETTING_INDEX_UUID, UUIDs.randomBase64UUID(random()))) + .numberOfShards(1) + .numberOfReplicas(0) + .build(); + metadataBuilder.put(indexMetaData, true); + routingTableBuilder.add(IndexRoutingTable.builder(indexMetaData.getIndex()) + .addShard(TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted()) + .build()); + } + return ClusterState.builder(previous.getClusterName()) + .metaData(metadataBuilder.build()) + .routingTable(routingTableBuilder.build()) .build(); - ClusterState.Builder csBuilder = ClusterState.builder(previous.getClusterName()) - .metaData(MetaData.builder(previous.metaData()) - .version(previous.metaData().version() + 1) - .put(indexMetaData, true)); - - ShardRouting shardRouting = - TestShardRouting.newShardRouting(indexName, 0, "1", true, ShardRoutingState.INITIALIZING).moveToStarted(); - IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex()).addShard(shardRouting).build(); - csBuilder.routingTable(RoutingTable.builder(previous.routingTable()).add(indexRoutingTable).build()).build(); - - return csBuilder.build(); } private static Supplier localClusterStateSupplier(ClusterState... states) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/GetAutoFollowPatternResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/GetAutoFollowPatternResponseTests.java index 55582815ce5e6..5fd6381001d36 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/GetAutoFollowPatternResponseTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/GetAutoFollowPatternResponseTests.java @@ -33,7 +33,7 @@ protected GetAutoFollowPatternAction.Response createTestInstance() { "remote", Collections.singletonList(randomAlphaOfLength(4)), randomAlphaOfLength(4), - randomIntBetween(0, Integer.MAX_VALUE), + true, randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE), randomIntBetween(0, Integer.MAX_VALUE), diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternActionTests.java new file mode 100644 index 0000000000000..18fa6cbd1dbf3 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportActivateAutoFollowPatternActionTests.java @@ -0,0 +1,96 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; +import org.elasticsearch.xpack.core.ccr.action.ActivateAutoFollowPatternAction.Request; + +import java.util.Arrays; + +import static java.util.Collections.singletonMap; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.sameInstance; + +public class TransportActivateAutoFollowPatternActionTests extends ESTestCase { + + public void testInnerActivateNoAutoFollowMetadata() { + Exception e = expectThrows(ResourceNotFoundException.class, + () -> TransportActivateAutoFollowPatternAction.innerActivate(new Request("test", true), ClusterState.EMPTY_STATE)); + assertThat(e.getMessage(), equalTo("auto-follow pattern [test] is missing")); + } + + public void testInnerActivateDoesNotExist() { + ClusterState clusterState = ClusterState.builder(new ClusterName("cluster")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata( + singletonMap("remote_cluster", randomAutoFollowPattern()), + singletonMap("remote_cluster", randomSubsetOf(randomIntBetween(1, 3), "uuid0", "uuid1", "uuid2")), + singletonMap("remote_cluster", singletonMap("header0", randomFrom("val0", "val2", "val3")))))) + .build(); + Exception e = expectThrows(ResourceNotFoundException.class, + () -> TransportActivateAutoFollowPatternAction.innerActivate(new Request("does_not_exist", true), clusterState)); + assertThat(e.getMessage(), equalTo("auto-follow pattern [does_not_exist] is missing")); + } + + public void testInnerActivateToggle() { + final AutoFollowMetadata.AutoFollowPattern autoFollowPattern = randomAutoFollowPattern(); + final ClusterState clusterState = ClusterState.builder(new ClusterName("cluster")) + .metaData(MetaData.builder().putCustom(AutoFollowMetadata.TYPE, + new AutoFollowMetadata( + singletonMap("remote_cluster", autoFollowPattern), + singletonMap("remote_cluster", randomSubsetOf(randomIntBetween(1, 3), "uuid0", "uuid1", "uuid2")), + singletonMap("remote_cluster", singletonMap("header0", randomFrom("val0", "val2", "val3")))))) + .build(); + { + Request pauseRequest = new Request("remote_cluster", autoFollowPattern.isActive()); + ClusterState updatedState = TransportActivateAutoFollowPatternAction.innerActivate(pauseRequest, clusterState); + assertThat(updatedState, sameInstance(clusterState)); + } + { + Request pauseRequest = new Request("remote_cluster", autoFollowPattern.isActive() == false); + ClusterState updatedState = TransportActivateAutoFollowPatternAction.innerActivate(pauseRequest, clusterState); + assertThat(updatedState, not(sameInstance(clusterState))); + + AutoFollowMetadata updatedAutoFollowMetadata = updatedState.getMetaData().custom(AutoFollowMetadata.TYPE); + assertNotEquals(updatedAutoFollowMetadata, notNullValue()); + + AutoFollowMetadata autoFollowMetadata = clusterState.getMetaData().custom(AutoFollowMetadata.TYPE); + assertNotEquals(updatedAutoFollowMetadata, autoFollowMetadata); + assertThat(updatedAutoFollowMetadata.getPatterns().size(), equalTo(autoFollowMetadata.getPatterns().size())); + assertThat(updatedAutoFollowMetadata.getPatterns().get("remote_cluster").isActive(), not(autoFollowPattern.isActive())); + + assertEquals(updatedAutoFollowMetadata.getFollowedLeaderIndexUUIDs(), autoFollowMetadata.getFollowedLeaderIndexUUIDs()); + assertEquals(updatedAutoFollowMetadata.getHeaders(), autoFollowMetadata.getHeaders()); + } + } + + private static AutoFollowMetadata.AutoFollowPattern randomAutoFollowPattern() { + return new AutoFollowMetadata.AutoFollowPattern(randomAlphaOfLength(5), + randomSubsetOf(Arrays.asList("test-*", "user-*", "logs-*", "failures-*")), + randomFrom("{{leader_index}}", "{{leader_index}}-follower", "test"), + randomBoolean(), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + randomIntBetween(1, 100), + new ByteSizeValue(randomIntBetween(1, 100), randomFrom(ByteSizeUnit.values())), + new ByteSizeValue(randomIntBetween(1, 100), randomFrom(ByteSizeUnit.values())), + randomIntBetween(1, 100), + new ByteSizeValue(randomIntBetween(1, 100), randomFrom(ByteSizeUnit.values())), + TimeValue.timeValueSeconds(randomIntBetween(30, 600)), + TimeValue.timeValueSeconds(randomIntBetween(30, 600))); + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java index 5ef43fc05c81c..06bb95e333ff5 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportDeleteAutoFollowPatternActionTests.java @@ -32,8 +32,8 @@ public void testInnerDelete() { { List existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); - existingAutoFollowPatterns.put("name1", - new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null, null, null)); + existingAutoFollowPatterns.put("name1", new AutoFollowPattern("eu_cluster", existingPatterns, null, true, null, null, null, + null, null, null, null, null, null, null)); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); @@ -43,8 +43,8 @@ public void testInnerDelete() { { List existingPatterns = new ArrayList<>(); existingPatterns.add("logs-*"); - existingAutoFollowPatterns.put("name2", - new AutoFollowPattern("asia_cluster", existingPatterns, null, null, null, null, null, null, null, null, null, null, null)); + existingAutoFollowPatterns.put("name2", new AutoFollowPattern("asia_cluster", existingPatterns, null, true, null, null, null, + null, null, null, null, null, null, null)); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); @@ -76,8 +76,8 @@ public void testInnerDeleteDoesNotExist() { { List existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); - existingAutoFollowPatterns.put("name1", - new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null, null, null)); + existingAutoFollowPatterns.put("name1", new AutoFollowPattern("eu_cluster", existingPatterns, null, true, null, null, null, + null, null, null, null, null, null, null)); existingHeaders.put("key", Collections.singletonMap("key", "val")); } ClusterState clusterState = ClusterState.builder(new ClusterName("us_cluster")) diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportGetAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportGetAutoFollowPatternActionTests.java index 85e1bf916aa3c..7de170442f137 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportGetAutoFollowPatternActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportGetAutoFollowPatternActionTests.java @@ -24,9 +24,9 @@ public class TransportGetAutoFollowPatternActionTests extends ESTestCase { public void testGetAutoFollowPattern() { Map patterns = new HashMap<>(); patterns.put("name1", new AutoFollowPattern( - "test_alias1", Collections.singletonList("index-*"), null, null, null, null, null, null, null, null, null, null, null)); + "test_alias1", Collections.singletonList("index-*"), null, true, null, null, null, null, null, null, null, null, null, null)); patterns.put("name2", new AutoFollowPattern( - "test_alias1", Collections.singletonList("index-*"), null, null, null, null, null, null, null, null, null, null, null)); + "test_alias1", Collections.singletonList("index-*"), null, true, null, null, null, null, null, null, null, null, null, null)); MetaData metaData = MetaData.builder() .putCustom(AutoFollowMetadata.TYPE, new AutoFollowMetadata(patterns, Collections.emptyMap(), Collections.emptyMap())) .build(); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java index ac556d47c85dd..05315f239be83 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternActionTests.java @@ -103,7 +103,7 @@ public void testInnerPut_existingLeaderIndicesAndAutoFollowMetadata() { List existingPatterns = new ArrayList<>(); existingPatterns.add("transactions-*"); existingAutoFollowPatterns.put("name1", - new AutoFollowPattern("eu_cluster", existingPatterns, null, null, null, null, null, null, null, null, null, null, null)); + new AutoFollowPattern("eu_cluster", existingPatterns, null, true, null, null, null, null, null, null, null, null, null, null)); Map> existingAlreadyFollowedIndexUUIDS = new HashMap<>(); List existingUUIDS = new ArrayList<>(); existingUUIDS.add("_val"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java index a8758ed6c2d5a..f0c32c57bad34 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/AutoFollowMetadata.java @@ -16,7 +16,7 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; -import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.xpack.core.XPackPlugin; @@ -174,8 +174,9 @@ public int hashCode() { return Objects.hash(patterns, followedLeaderIndexUUIDs, headers); } - public static class AutoFollowPattern extends ImmutableFollowParameters implements ToXContentObject { + public static class AutoFollowPattern extends ImmutableFollowParameters implements ToXContentFragment { + public static final ParseField ACTIVE = new ParseField("active"); public static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster"); public static final ParseField LEADER_PATTERNS_FIELD = new ParseField("leader_index_patterns"); public static final ParseField FOLLOW_PATTERN_FIELD = new ParseField("follow_index_pattern"); @@ -183,24 +184,28 @@ public static class AutoFollowPattern extends ImmutableFollowParameters implemen @SuppressWarnings("unchecked") private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("auto_follow_pattern", - args -> new AutoFollowPattern((String) args[0], (List) args[1], (String) args[2], (Integer) args[3], - (Integer) args[4], (Integer) args[5], (Integer) args[6], (ByteSizeValue) args[7], (ByteSizeValue) args[8], - (Integer) args[9], (ByteSizeValue) args[10], (TimeValue) args[11], (TimeValue) args[12])); + args -> new AutoFollowPattern((String) args[0], (List) args[1], (String) args[2], + args[3] == null || (boolean) args[3], (Integer) args[4], (Integer) args[5], (Integer) args[6], (Integer) args[7], + (ByteSizeValue) args[8], (ByteSizeValue) args[9], (Integer) args[10], (ByteSizeValue) args[11], (TimeValue) args[12], + (TimeValue) args[13])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), REMOTE_CLUSTER_FIELD); PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), LEADER_PATTERNS_FIELD); PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FOLLOW_PATTERN_FIELD); + PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), ACTIVE); ImmutableFollowParameters.initParser(PARSER); } private final String remoteCluster; private final List leaderIndexPatterns; private final String followIndexPattern; + private final boolean active; public AutoFollowPattern(String remoteCluster, List leaderIndexPatterns, String followIndexPattern, + boolean active, Integer maxReadRequestOperationCount, Integer maxWriteRequestOperationCount, Integer maxOutstandingReadRequests, @@ -216,6 +221,7 @@ public AutoFollowPattern(String remoteCluster, this.remoteCluster = remoteCluster; this.leaderIndexPatterns = leaderIndexPatterns; this.followIndexPattern = followIndexPattern; + this.active = active; } public static AutoFollowPattern readFrom(StreamInput in) throws IOException { @@ -228,6 +234,11 @@ private AutoFollowPattern(String remoteCluster, List leaderIndexPatterns this.remoteCluster = remoteCluster; this.leaderIndexPatterns = leaderIndexPatterns; this.followIndexPattern = followIndexPattern; + if (in.getVersion().onOrAfter(Version.V_7_5_0)) { + this.active = in.readBoolean(); + } else { + this.active = true; + } } public boolean match(String indexName) { @@ -250,16 +261,24 @@ public String getFollowIndexPattern() { return followIndexPattern; } + public boolean isActive() { + return active; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(remoteCluster); out.writeStringCollection(leaderIndexPatterns); out.writeOptionalString(followIndexPattern); super.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_7_5_0)) { + out.writeBoolean(active); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(ACTIVE.getPreferredName(), active); builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster); builder.array(LEADER_PATTERNS_FIELD.getPreferredName(), leaderIndexPatterns.toArray(new String[0])); if (followIndexPattern != null) { @@ -269,25 +288,21 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws return builder; } - @Override - public boolean isFragment() { - return true; - } - @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; if (!super.equals(o)) return false; AutoFollowPattern pattern = (AutoFollowPattern) o; - return remoteCluster.equals(pattern.remoteCluster) && + return active == pattern.active && + remoteCluster.equals(pattern.remoteCluster) && leaderIndexPatterns.equals(pattern.leaderIndexPatterns) && followIndexPattern.equals(pattern.followIndexPattern); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), remoteCluster, leaderIndexPatterns, followIndexPattern); + return Objects.hash(super.hashCode(), remoteCluster, leaderIndexPatterns, followIndexPattern, active); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ActivateAutoFollowPatternAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ActivateAutoFollowPatternAction.java new file mode 100644 index 0000000000000..5b8f644ceca67 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ActivateAutoFollowPatternAction.java @@ -0,0 +1,84 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ccr.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.support.master.AcknowledgedRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; + +public class ActivateAutoFollowPatternAction extends ActionType { + + public static final String NAME = "cluster:admin/xpack/ccr/auto_follow_pattern/activate"; + public static final ActivateAutoFollowPatternAction INSTANCE = new ActivateAutoFollowPatternAction(); + + private ActivateAutoFollowPatternAction() { + super(NAME, AcknowledgedResponse::new); + } + + public static class Request extends AcknowledgedRequest { + + private final String name; + private final boolean active; + + public Request(final String name, final boolean active) { + this.name = name; + this.active = active; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (name == null) { + validationException = addValidationError("[name] is missing", validationException); + } + return validationException; + } + + public String getName() { + return name; + } + + public boolean isActive() { + return active; + } + + public Request(final StreamInput in) throws IOException { + super(in); + name = in.readString(); + active = in.readBoolean(); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(name); + out.writeBoolean(active); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return active == request.active + && Objects.equals(name, request.name); + } + + @Override + public int hashCode() { + return Objects.hash(name, active); + } + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.pause_auto_follow_pattern.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.pause_auto_follow_pattern.json new file mode 100644 index 0000000000000..9e76b83bb904f --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.pause_auto_follow_pattern.json @@ -0,0 +1,24 @@ +{ + "ccr.pause_auto_follow_pattern":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-pause-auto-follow-pattern.html" + }, + "stability":"stable", + "url":{ + "paths":[ + { + "path":"/_ccr/auto_follow/{name}/pause", + "methods":[ + "POST" + ], + "parts":{ + "name":{ + "type":"string", + "description":"The name of the auto follow pattern that should pause discovering new indices to follow." + } + } + } + ] + } + } +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.resume_auto_follow_pattern.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.resume_auto_follow_pattern.json new file mode 100644 index 0000000000000..96b77cb82e93f --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.resume_auto_follow_pattern.json @@ -0,0 +1,24 @@ +{ + "ccr.resume_auto_follow_pattern":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-resume-auto-follow-pattern.html" + }, + "stability":"stable", + "url":{ + "paths":[ + { + "path":"/_ccr/auto_follow/{name}/resume", + "methods":[ + "POST" + ], + "parts":{ + "name":{ + "type":"string", + "description":"The name of the auto follow pattern to resume discovering new indices to follow." + } + } + } + ] + } + } +}