diff --git a/buildSrc/src/main/groovy/org/elasticsearch/gradle/doc/RestTestsFromSnippetsTask.groovy b/buildSrc/src/main/groovy/org/elasticsearch/gradle/doc/RestTestsFromSnippetsTask.groovy index d56cb1926f3e2..439a60e6c3aaf 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/gradle/doc/RestTestsFromSnippetsTask.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/gradle/doc/RestTestsFromSnippetsTask.groovy @@ -104,7 +104,7 @@ public class RestTestsFromSnippetsTask extends SnippetsTask { * format of the response is incompatible i.e. it is not a JSON object. */ static shouldAddShardFailureCheck(String path) { - return path.startsWith('_cat') == false && path.startsWith('_ml/datafeeds/') == false + return path.startsWith('_cat') == false && path.startsWith('_ml/datafeeds/') == false } /** @@ -294,7 +294,7 @@ public class RestTestsFromSnippetsTask extends SnippetsTask { } void emitDo(String method, String pathAndQuery, String body, - String catchPart, List warnings, boolean inSetup) { + String catchPart, List warnings, boolean inSetup, boolean skipShardFailures) { def (String path, String query) = pathAndQuery.tokenize('?') if (path == null) { path = '' // Catch requests to the root... @@ -346,7 +346,7 @@ public class RestTestsFromSnippetsTask extends SnippetsTask { * section so we have to skip it there. We also omit the assertion * from APIs that don't return a JSON object */ - if (false == inSetup && shouldAddShardFailureCheck(path)) { + if (false == inSetup && skipShardFailures == false && shouldAddShardFailureCheck(path)) { current.println(" - is_false: _shards.failures") } } @@ -394,7 +394,7 @@ public class RestTestsFromSnippetsTask extends SnippetsTask { pathAndQuery = pathAndQuery.substring(1) } emitDo(method, pathAndQuery, body, catchPart, snippet.warnings, - inSetup) + inSetup, snippet.skipShardsFailures) } } diff --git a/buildSrc/src/main/groovy/org/elasticsearch/gradle/doc/SnippetsTask.groovy b/buildSrc/src/main/groovy/org/elasticsearch/gradle/doc/SnippetsTask.groovy index fbc231aa764dc..83a6a05ec5df7 100644 --- a/buildSrc/src/main/groovy/org/elasticsearch/gradle/doc/SnippetsTask.groovy +++ b/buildSrc/src/main/groovy/org/elasticsearch/gradle/doc/SnippetsTask.groovy @@ -45,7 +45,7 @@ public class SnippetsTask extends DefaultTask { private static final String WARNING = /warning:(.+)/ private static final String CAT = /(_cat)/ private static final String TEST_SYNTAX = - /(?:$CATCH|$SUBSTITUTION|$SKIP|(continued)|$SETUP|$WARNING) ?/ + /(?:$CATCH|$SUBSTITUTION|$SKIP|(continued)|$SETUP|$WARNING|(skip_shard_failures)) ?/ /** * Action to take on each snippet. Called with a single parameter, an @@ -233,6 +233,10 @@ public class SnippetsTask extends DefaultTask { snippet.warnings.add(it.group(7)) return } + if (it.group(8) != null) { + snippet.skipShardsFailures = true + return + } throw new InvalidUserDataException( "Invalid test marker: $line") } @@ -329,6 +333,7 @@ public class SnippetsTask extends DefaultTask { String setup = null boolean curl List warnings = new ArrayList() + boolean skipShardsFailures = false @Override public String toString() { @@ -359,6 +364,9 @@ public class SnippetsTask extends DefaultTask { for (String warning in warnings) { result += "[warning:$warning]" } + if (skipShardsFailures) { + result += '[skip_shard_failures]' + } } if (testResponse) { result += '// TESTRESPONSE' diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java index b6c6866966725..373b94124d43e 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java @@ -27,6 +27,7 @@ import org.elasticsearch.client.ccr.FollowInfoResponse; import org.elasticsearch.client.ccr.FollowStatsRequest; import org.elasticsearch.client.ccr.FollowStatsResponse; +import org.elasticsearch.client.ccr.ForgetFollowerRequest; import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest; import org.elasticsearch.client.ccr.GetAutoFollowPatternResponse; import org.elasticsearch.client.ccr.PauseFollowRequest; @@ -36,6 +37,7 @@ import org.elasticsearch.client.ccr.ResumeFollowRequest; import org.elasticsearch.client.ccr.UnfollowRequest; import org.elasticsearch.client.core.AcknowledgedResponse; +import org.elasticsearch.client.core.BroadcastResponse; import java.io.IOException; import java.util.Collections; @@ -233,6 +235,48 @@ public void unfollowAsync(UnfollowRequest request, ); } + /** + * Instructs an index acting as a leader index to forget the specified follower index. + * + * See the docs for more details + * on the intended usage of this API. + * + * @param request the request + * @param options the request options (e.g., headers), use {@link RequestOptions#DEFAULT} if the defaults are acceptable. + * @return the response + * @throws IOException if an I/O exception occurs while executing this request + */ + public BroadcastResponse forgetFollower(final ForgetFollowerRequest request, final RequestOptions options) throws IOException { + return restHighLevelClient.performRequestAndParseEntity( + request, + CcrRequestConverters::forgetFollower, + options, + BroadcastResponse::fromXContent, + Collections.emptySet()); + } + + /** + * Asynchronously instructs an index acting as a leader index to forget the specified follower index. + * + * See the docs for more details + * on the intended usage of this API. + * + * @param request the request + * @param options the request options (e.g., headers), use {@link RequestOptions#DEFAULT} if the defaults are acceptable. + */ + public void forgetFollowerAsync( + final ForgetFollowerRequest request, + final RequestOptions options, + final ActionListener listener) { + restHighLevelClient.performRequestAsyncAndParseEntity( + request, + CcrRequestConverters::forgetFollower, + options, + BroadcastResponse::fromXContent, + listener, + Collections.emptySet()); + } + /** * Stores an auto follow pattern. * diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java index 2e05aee9d7598..a3f5d7e79fda7 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java @@ -27,6 +27,7 @@ import org.elasticsearch.client.ccr.DeleteAutoFollowPatternRequest; import org.elasticsearch.client.ccr.FollowInfoRequest; import org.elasticsearch.client.ccr.FollowStatsRequest; +import org.elasticsearch.client.ccr.ForgetFollowerRequest; import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest; import org.elasticsearch.client.ccr.PauseFollowRequest; import org.elasticsearch.client.ccr.PutAutoFollowPatternRequest; @@ -79,6 +80,17 @@ static Request unfollow(UnfollowRequest unfollowRequest) { return new Request(HttpPost.METHOD_NAME, endpoint); } + static Request forgetFollower(final ForgetFollowerRequest forgetFollowerRequest) throws IOException { + final String endpoint = new RequestConverters.EndpointBuilder() + .addPathPart(forgetFollowerRequest.leaderIndex()) + .addPathPartAsIs("_ccr") + .addPathPartAsIs("forget_follower") + .build(); + final Request request = new Request(HttpPost.METHOD_NAME, endpoint); + request.setEntity(createEntity(forgetFollowerRequest, REQUEST_BODY_CONTENT_TYPE)); + return request; + } + static Request putAutoFollowPattern(PutAutoFollowPatternRequest putAutoFollowPatternRequest) throws IOException { String endpoint = new RequestConverters.EndpointBuilder() .addPathPartAsIs("_ccr", "auto_follow") diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/ForgetFollowerRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/ForgetFollowerRequest.java new file mode 100644 index 0000000000000..3d20a6d934d9d --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/ForgetFollowerRequest.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.ccr; + +import org.elasticsearch.client.Validatable; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +/** + * Represents a forget follower request. Note that this an expert API intended to be used only when unfollowing a follower index fails to + * remove the follower retention leases. Please be sure that you understand the purpose this API before using. + */ +public final class ForgetFollowerRequest implements ToXContentObject, Validatable { + + private final String followerCluster; + + private final String followerIndex; + + private final String followerIndexUUID; + + private final String leaderRemoteCluster; + + private final String leaderIndex; + + /** + * The name of the leader index. + * + * @return the name of the leader index + */ + public String leaderIndex() { + return leaderIndex; + } + + /** + * Construct a forget follower request. + * + * @param followerCluster the name of the cluster containing the follower index to forget + * @param followerIndex the name of follower index + * @param followerIndexUUID the UUID of the follower index + * @param leaderRemoteCluster the alias of the remote cluster containing the leader index from the perspective of the follower index + * @param leaderIndex the name of the leader index + */ + public ForgetFollowerRequest( + final String followerCluster, + final String followerIndex, + final String followerIndexUUID, + final String leaderRemoteCluster, + final String leaderIndex) { + this.followerCluster = Objects.requireNonNull(followerCluster); + this.followerIndex = Objects.requireNonNull(followerIndex); + this.followerIndexUUID = Objects.requireNonNull(followerIndexUUID); + this.leaderRemoteCluster = Objects.requireNonNull(leaderRemoteCluster); + this.leaderIndex = Objects.requireNonNull(leaderIndex); + } + + @Override + public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException { + builder.startObject(); + { + builder.field("follower_cluster", followerCluster); + builder.field("follower_index", followerIndex); + builder.field("follower_index_uuid", followerIndexUUID); + builder.field("leader_remote_cluster", leaderRemoteCluster); + } + builder.endObject(); + return builder; + } + +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/core/BroadcastResponse.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/core/BroadcastResponse.java new file mode 100644 index 0000000000000..3665ba5bf5009 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/core/BroadcastResponse.java @@ -0,0 +1,175 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.core; + +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Objects; + +/** + * Represents a response to a request that is broadcast to a collection of shards. + */ +public class BroadcastResponse { + + private final Shards shards; + + /** + * Represents the shard-level summary of the response execution. + * + * @return the shard-level response summary + */ + public Shards shards() { + return shards; + } + + BroadcastResponse(final Shards shards) { + this.shards = Objects.requireNonNull(shards); + } + + private static final ParseField SHARDS_FIELD = new ParseField("_shards"); + + static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "broadcast_response", + a -> new BroadcastResponse((Shards) a[0])); + + static { + PARSER.declareObject(ConstructingObjectParser.constructorArg(), Shards.SHARDS_PARSER, SHARDS_FIELD); + } + + /** + * Parses a broadcast response. + * + * @param parser the parser + * @return a broadcast response parsed from the specified parser + * @throws IOException if an I/O exception occurs parsing the response + */ + public static BroadcastResponse fromXContent(final XContentParser parser) throws IOException { + return PARSER.parse(parser, null); + } + + /** + * Represents the results of a collection of shards on which a request was executed against. + */ + public static class Shards { + + private final int total; + + /** + * The total number of shards on which a request was executed against. + * + * @return the total number of shards + */ + public int total() { + return total; + } + + private final int successful; + + /** + * The number of successful shards on which a request was executed against. + * + * @return the number of successful shards + */ + public int successful() { + return successful; + } + + private final int skipped; + + /** + * The number of shards skipped by the request. + * + * @return the number of skipped shards + */ + public int skipped() { + return skipped; + } + + private final int failed; + + /** + * The number of shards on which a request failed to be executed against. + * + * @return the number of failed shards + */ + public int failed() { + return failed; + } + + private final Collection failures; + + /** + * The failures corresponding to the shards on which a request failed to be executed against. Note that the number of failures might + * not match {@link #failed()} as some responses group together shard failures. + * + * @return the failures + */ + public Collection failures() { + return failures; + } + + Shards( + final int total, + final int successful, + final int skipped, + final int failed, + final Collection failures) { + this.total = total; + this.successful = successful; + this.skipped = skipped; + this.failed = failed; + this.failures = Collections.unmodifiableCollection(Objects.requireNonNull(failures)); + } + + private static final ParseField TOTAL_FIELD = new ParseField("total"); + private static final ParseField SUCCESSFUL_FIELD = new ParseField("successful"); + private static final ParseField SKIPPED_FIELD = new ParseField("skipped"); + private static final ParseField FAILED_FIELD = new ParseField("failed"); + private static final ParseField FAILURES_FIELD = new ParseField("failures"); + + @SuppressWarnings("unchecked") + static final ConstructingObjectParser SHARDS_PARSER = new ConstructingObjectParser<>( + "shards", + a -> new Shards( + (int) a[0], // total + (int) a[1], // successful + a[2] == null ? 0 : (int) a[2], // skipped + (int) a[3], // failed + a[4] == null ? Collections.emptyList() : (Collection) a[4])); // failures + + static { + SHARDS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), TOTAL_FIELD); + SHARDS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), SUCCESSFUL_FIELD); + SHARDS_PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), SKIPPED_FIELD); + SHARDS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), FAILED_FIELD); + SHARDS_PARSER.declareObjectArray( + ConstructingObjectParser.optionalConstructorArg(), + DefaultShardOperationFailedException.PARSER, FAILURES_FIELD); + } + + } + +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java index 14e2f977e63d1..13e0af5f0b139 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java @@ -36,6 +36,7 @@ import org.elasticsearch.client.ccr.FollowInfoResponse; import org.elasticsearch.client.ccr.FollowStatsRequest; import org.elasticsearch.client.ccr.FollowStatsResponse; +import org.elasticsearch.client.ccr.ForgetFollowerRequest; import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest; import org.elasticsearch.client.ccr.GetAutoFollowPatternResponse; import org.elasticsearch.client.ccr.IndicesFollowStats; @@ -47,19 +48,24 @@ import org.elasticsearch.client.ccr.ResumeFollowRequest; import org.elasticsearch.client.ccr.UnfollowRequest; import org.elasticsearch.client.core.AcknowledgedResponse; +import org.elasticsearch.client.core.BroadcastResponse; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.test.rest.yaml.ObjectPath; import org.junit.Before; import java.io.IOException; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -203,6 +209,61 @@ public void testIndexFollowing() throws Exception { assertThat(unfollowResponse.isAcknowledged(), is(true)); } + public void testForgetFollower() throws IOException { + final CcrClient ccrClient = highLevelClient().ccr(); + + final CreateIndexRequest createIndexRequest = new CreateIndexRequest("leader"); + final Map settings = new HashMap<>(3); + final int numberOfShards = randomIntBetween(1, 2); + settings.put("index.number_of_replicas", "0"); + settings.put("index.number_of_shards", Integer.toString(numberOfShards)); + settings.put("index.soft_deletes.enabled", Boolean.TRUE.toString()); + createIndexRequest.settings(settings); + final CreateIndexResponse response = highLevelClient().indices().create(createIndexRequest, RequestOptions.DEFAULT); + assertThat(response.isAcknowledged(), is(true)); + + final PutFollowRequest putFollowRequest = new PutFollowRequest("local_cluster", "leader", "follower", ActiveShardCount.ONE); + final PutFollowResponse putFollowResponse = execute(putFollowRequest, ccrClient::putFollow, ccrClient::putFollowAsync); + assertTrue(putFollowResponse.isFollowIndexCreated()); + assertTrue(putFollowResponse.isFollowIndexShardsAcked()); + assertTrue(putFollowResponse.isIndexFollowingStarted()); + + final String clusterName = highLevelClient().info(RequestOptions.DEFAULT).getClusterName().value(); + + final Request statsRequest = new Request("GET", "/follower/_stats"); + final Response statsResponse = client().performRequest(statsRequest); + final ObjectPath statsObjectPath = ObjectPath.createFromResponse(statsResponse); + final String followerIndexUUID = statsObjectPath.evaluate("indices.follower.uuid"); + + final PauseFollowRequest pauseFollowRequest = new PauseFollowRequest("follower"); + AcknowledgedResponse pauseFollowResponse = execute(pauseFollowRequest, ccrClient::pauseFollow, ccrClient::pauseFollowAsync); + assertTrue(pauseFollowResponse.isAcknowledged()); + + final ForgetFollowerRequest forgetFollowerRequest = + new ForgetFollowerRequest(clusterName, "follower", followerIndexUUID, "local_cluster", "leader"); + final BroadcastResponse forgetFollowerResponse = + execute(forgetFollowerRequest, ccrClient::forgetFollower, ccrClient::forgetFollowerAsync); + assertThat(forgetFollowerResponse.shards().total(), equalTo(numberOfShards)); + assertThat(forgetFollowerResponse.shards().successful(), equalTo(numberOfShards)); + assertThat(forgetFollowerResponse.shards().skipped(), equalTo(0)); + assertThat(forgetFollowerResponse.shards().failed(), equalTo(0)); + assertThat(forgetFollowerResponse.shards().failures(), empty()); + + final Request retentionLeasesRequest = new Request("GET", "/leader/_stats"); + retentionLeasesRequest.addParameter("level", "shards"); + final Response retentionLeasesResponse = client().performRequest(retentionLeasesRequest); + final Map shardsStats = ObjectPath.createFromResponse(retentionLeasesResponse).evaluate("indices.leader.shards"); + assertThat(shardsStats.keySet(), hasSize(numberOfShards)); + for (int i = 0; i < numberOfShards; i++) { + final List shardStats = (List) shardsStats.get(Integer.toString(i)); + assertThat(shardStats, hasSize(1)); + final Map shardStatsAsMap = (Map) shardStats.get(0); + final Map retentionLeasesStats = (Map) shardStatsAsMap.get("retention_leases"); + final List leases = (List) retentionLeasesStats.get("leases"); + assertThat(leases, empty()); + } + } + public void testAutoFollowing() throws Exception { CcrClient ccrClient = highLevelClient().ccr(); PutAutoFollowPatternRequest putAutoFollowPatternRequest = diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CcrRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CcrRequestConvertersTests.java index 7740fff99f68d..393b7b9ba6f20 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CcrRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CcrRequestConvertersTests.java @@ -29,6 +29,7 @@ import org.elasticsearch.client.ccr.FollowConfig; import org.elasticsearch.client.ccr.FollowInfoRequest; import org.elasticsearch.client.ccr.FollowStatsRequest; +import org.elasticsearch.client.ccr.ForgetFollowerRequest; import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest; import org.elasticsearch.client.ccr.PauseFollowRequest; import org.elasticsearch.client.ccr.PutAutoFollowPatternRequest; @@ -39,9 +40,11 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; +import java.io.IOException; import java.util.Arrays; import java.util.Locale; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -91,6 +94,20 @@ public void testUnfollow() { assertThat(result.getEntity(), nullValue()); } + public void testForgetFollower() throws IOException { + final ForgetFollowerRequest request = new ForgetFollowerRequest( + randomAlphaOfLength(8), + randomAlphaOfLength(8), + randomAlphaOfLength(8), + randomAlphaOfLength(8), + randomAlphaOfLength(8)); + final Request convertedRequest = CcrRequestConverters.forgetFollower(request); + assertThat(convertedRequest.getMethod(), equalTo(HttpPost.METHOD_NAME)); + assertThat(convertedRequest.getEndpoint(), equalTo("/" + request.leaderIndex() + "/_ccr/forget_follower")); + assertThat(convertedRequest.getParameters().keySet(), empty()); + RequestConvertersTests.assertToXContentBody(request, convertedRequest.getEntity()); + } + public void testPutAutofollowPattern() throws Exception { PutAutoFollowPatternRequest putAutoFollowPatternRequest = new PutAutoFollowPatternRequest(randomAlphaOfLength(4), randomAlphaOfLength(4), Arrays.asList(generateRandomStringArray(4, 4, false))); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/core/BroadcastResponseTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/core/BroadcastResponseTests.java new file mode 100644 index 0000000000000..96438725d4ef0 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/core/BroadcastResponseTests.java @@ -0,0 +1,90 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.core; + +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.cluster.ClusterModule; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContent; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.seqno.RetentionLeaseNotFoundException; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.isIn; + +public class BroadcastResponseTests extends ESTestCase { + + public void testFromXContent() throws IOException { + final String index = randomAlphaOfLength(8); + final String id = randomAlphaOfLength(8); + final int total = randomIntBetween(1, 16); + final int successful = total - scaledRandomIntBetween(0, total); + final int failed = scaledRandomIntBetween(0, total - successful); + final List failures = new ArrayList<>(); + final Set shardIds = new HashSet<>(); + for (int i = 0; i < failed; i++) { + final DefaultShardOperationFailedException failure = new DefaultShardOperationFailedException( + index, + randomValueOtherThanMany(shardIds::contains, () -> randomIntBetween(0, total - 1)), + new RetentionLeaseNotFoundException(id)); + failures.add(failure); + shardIds.add(failure.shardId()); + } + + final org.elasticsearch.action.support.broadcast.BroadcastResponse to = + new org.elasticsearch.action.support.broadcast.BroadcastResponse(total, successful, failed, failures); + + final XContentType xContentType = randomFrom(XContentType.values()); + final BytesReference bytes = toShuffledXContent(to, xContentType, ToXContent.EMPTY_PARAMS, randomBoolean()); + + final XContent xContent = XContentFactory.xContent(xContentType); + final XContentParser parser = xContent.createParser( + new NamedXContentRegistry(ClusterModule.getNamedXWriteables()), + LoggingDeprecationHandler.INSTANCE, + bytes.streamInput()); + final BroadcastResponse from = BroadcastResponse.fromXContent(parser); + assertThat(from.shards().total(), equalTo(total)); + assertThat(from.shards().successful(), equalTo(successful)); + assertThat(from.shards().skipped(), equalTo(0)); + assertThat(from.shards().failed(), equalTo(failed)); + assertThat(from.shards().failures(), hasSize(failed == 0 ? failed : 1)); // failures are grouped + if (failed > 0) { + final DefaultShardOperationFailedException groupedFailure = from.shards().failures().iterator().next(); + assertThat(groupedFailure.index(), equalTo(index)); + assertThat(groupedFailure.shardId(), isIn(shardIds)); + assertThat(groupedFailure.reason(), containsString("reason=retention lease with ID [" + id + "] not found")); + } + } + +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java index 23cdd39787d32..baf8132096cb8 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java @@ -40,6 +40,7 @@ import org.elasticsearch.client.ccr.FollowInfoResponse; import org.elasticsearch.client.ccr.FollowStatsRequest; import org.elasticsearch.client.ccr.FollowStatsResponse; +import org.elasticsearch.client.ccr.ForgetFollowerRequest; import org.elasticsearch.client.ccr.GetAutoFollowPatternRequest; import org.elasticsearch.client.ccr.GetAutoFollowPatternResponse; import org.elasticsearch.client.ccr.GetAutoFollowPatternResponse.Pattern; @@ -51,15 +52,18 @@ import org.elasticsearch.client.ccr.ResumeFollowRequest; import org.elasticsearch.client.ccr.UnfollowRequest; import org.elasticsearch.client.core.AcknowledgedResponse; +import org.elasticsearch.client.core.BroadcastResponse; import org.elasticsearch.client.indices.CreateIndexRequest; import org.elasticsearch.client.indices.CreateIndexResponse; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.test.rest.yaml.ObjectPath; import org.junit.Before; import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -395,6 +399,101 @@ public void onFailure(Exception e) { assertTrue(latch.await(30L, TimeUnit.SECONDS)); } + public void testForgetFollower() throws InterruptedException, IOException { + final RestHighLevelClient client = highLevelClient(); + final String leaderIndex = "leader"; + { + // create leader index + final CreateIndexRequest createIndexRequest = new CreateIndexRequest(leaderIndex); + final Map settings = new HashMap<>(2); + final int numberOfShards = randomIntBetween(1, 2); + settings.put("index.number_of_shards", Integer.toString(numberOfShards)); + settings.put("index.soft_deletes.enabled", Boolean.TRUE.toString()); + createIndexRequest.settings(settings); + final CreateIndexResponse response = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); + assertThat(response.isAcknowledged(), is(true)); + } + final String followerIndex = "follower"; + + final PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followerIndex, ActiveShardCount.ONE); + final PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT); + assertTrue(putFollowResponse.isFollowIndexCreated()); + assertTrue((putFollowResponse.isFollowIndexShardsAcked())); + assertTrue(putFollowResponse.isIndexFollowingStarted()); + + final PauseFollowRequest pauseFollowRequest = new PauseFollowRequest("follower"); + AcknowledgedResponse pauseFollowResponse = client.ccr().pauseFollow(pauseFollowRequest, RequestOptions.DEFAULT); + assertTrue(pauseFollowResponse.isAcknowledged()); + + final String followerCluster = highLevelClient().info(RequestOptions.DEFAULT).getClusterName().value(); + final Request statsRequest = new Request("GET", "/follower/_stats"); + final Response statsResponse = client().performRequest(statsRequest); + final ObjectPath statsObjectPath = ObjectPath.createFromResponse(statsResponse); + final String followerIndexUUID = statsObjectPath.evaluate("indices.follower.uuid"); + + final String leaderCluster = "local"; + + // tag::ccr-forget-follower-request + final ForgetFollowerRequest request = new ForgetFollowerRequest( + followerCluster, // <1> + followerIndex, // <2> + followerIndexUUID, // <3> + leaderCluster, // <4> + leaderIndex); // <5> + // end::ccr-forget-follower-request + + // tag::ccr-forget-follower-execute + final BroadcastResponse response = client + .ccr() + .forgetFollower(request, RequestOptions.DEFAULT); + // end::ccr-forget-follower-execute + + // tag::ccr-forget-follower-response + final BroadcastResponse.Shards shards = response.shards(); // <1> + final int total = shards.total(); // <2> + final int successful = shards.successful(); // <3> + final int skipped = shards.skipped(); // <4> + final int failed = shards.failed(); // <5> + shards.failures().forEach(failure -> {}); // <6> + // end::ccr-forget-follower-response + + // tag::ccr-forget-follower-execute-listener + ActionListener listener = + new ActionListener() { + + @Override + public void onResponse(final BroadcastResponse response) { + final BroadcastResponse.Shards shards = // <1> + response.shards(); + final int total = shards.total(); + final int successful = shards.successful(); + final int skipped = shards.skipped(); + final int failed = shards.failed(); + shards.failures().forEach(failure -> {}); + } + + @Override + public void onFailure(final Exception e) { + // <2> + } + + }; + // end::ccr-forget-follower-execute-listener + + // replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::ccr-forget-follower-execute-async + client.ccr().forgetFollowerAsync( + request, + RequestOptions.DEFAULT, + listener); // <1> + // end::ccr-forget-follower-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + public void testPutAutoFollowPattern() throws Exception { RestHighLevelClient client = highLevelClient(); diff --git a/docs/java-rest/high-level/ccr/forget_follower.asciidoc b/docs/java-rest/high-level/ccr/forget_follower.asciidoc new file mode 100644 index 0000000000000..bf1fde014b8e6 --- /dev/null +++ b/docs/java-rest/high-level/ccr/forget_follower.asciidoc @@ -0,0 +1,45 @@ +-- +:api: ccr-forget-follower +:request: ForgetFollowerRequest +:response: BroadcastResponse +-- + +[id="{upid}-{api}"] +=== Forget Follower API + +[id="{upid}-{api}-request"] +==== Request + +The Forget Follower API allows you to manually remove the follower retention +leases from the leader. Note that these retention leases are automatically +managed by the following index. This API exists only for cases when invoking +the unfollow API on the follower index is unable to remove the follower +retention leases. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests-file}[{api}-request] +-------------------------------------------------- +<1> The name of the cluster containing the follower index. +<2> The name of the follower index. +<3> The UUID of the follower index (can be obtained from index stats). +<4> The alias of the remote cluster containing the leader index. +<5> The name of the leader index. + +[id="{upid}-{api}-response"] +==== Response + +The returned +{response}+ indicates if the response was successful. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests-file}[{api}-response] +-------------------------------------------------- +<1> The high-level shards summary. +<2> The total number of shards the request was executed on. +<3> The total number of shards the request was successful on. +<4> The total number of shards the request was skipped on (should always be zero). +<5> The total number of shards the request failed on. +<6> The shard-level failures. + +include::../execution.asciidoc[] diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index 1f3bee46cc858..80b827c4e5a4f 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -502,6 +502,7 @@ The Java High Level REST Client supports the following CCR APIs: * <<{upid}-ccr-pause-follow>> * <<{upid}-ccr-resume-follow>> * <<{upid}-ccr-unfollow>> +* <<{upid}-ccr-forget-follower>> * <<{upid}-ccr-put-auto-follow-pattern>> * <<{upid}-ccr-delete-auto-follow-pattern>> * <<{upid}-ccr-get-auto-follow-pattern>> diff --git a/docs/reference/ccr/apis/ccr-apis.asciidoc b/docs/reference/ccr/apis/ccr-apis.asciidoc index c7c5194790360..3a745f239867d 100644 --- a/docs/reference/ccr/apis/ccr-apis.asciidoc +++ b/docs/reference/ccr/apis/ccr-apis.asciidoc @@ -19,6 +19,7 @@ You can use the following APIs to perform {ccr} operations. * <> * <> * <> +* <> * <> * <> @@ -38,6 +39,7 @@ include::follow/put-follow.asciidoc[] include::follow/post-pause-follow.asciidoc[] include::follow/post-resume-follow.asciidoc[] include::follow/post-unfollow.asciidoc[] +include::follow/post-forget-follower.asciidoc[] include::follow/get-follow-stats.asciidoc[] include::follow/get-follow-info.asciidoc[] diff --git a/docs/reference/ccr/apis/follow/post-forget-follower.asciidoc b/docs/reference/ccr/apis/follow/post-forget-follower.asciidoc new file mode 100644 index 0000000000000..5d5fb6a218449 --- /dev/null +++ b/docs/reference/ccr/apis/follow/post-forget-follower.asciidoc @@ -0,0 +1,152 @@ +[role="xpack"] +[testenv="platinum"] +[[ccr-post-forget-follower]] +=== Forget Follower API +++++ +Forget Follower +++++ + +Removes the follower retention leases from the leader. + +==== Description + +A following index takes out retention leases on its leader index. These +retention leases are used to increase the likelihood that the shards of the +leader index retain the history of operations that the shards of the following +index need to execute replication. When a follower index is converted to a +regular index via the <> (either via explicit +execution of this API, or implicitly via {ilm}), these retention leases are +removed. However, removing these retention leases can fail (e.g., if the remote +cluster containing the leader index is unavailable). While these retention +leases will eventually expire on their own, their extended existence can cause +the leader index to hold more history than necessary, and prevent {ilm} from +performing some operations on the leader index. This API exists to enable +manually removing these retention leases when the unfollow API was unable to do +so. + +NOTE: This API does not stop replication by a following index. If you use this +API targeting a follower index that is still actively following, the following +index will add back retention leases on the leader. The only purpose of this API +is to handle the case of failure to remove the following retention leases after +the <> is invoked. + +==== Request + +////////////////////////// + +[source,js] +-------------------------------------------------- +PUT /follower_index/_ccr/follow?wait_for_active_shards=1 +{ + "remote_cluster" : "remote_cluster", + "leader_index" : "leader_index" +} +-------------------------------------------------- +// CONSOLE +// TESTSETUP +// TEST[setup:remote_cluster_and_leader_index] + +[source,js] +-------------------------------------------------- +POST /follower_index/_ccr/pause_follow +-------------------------------------------------- +// CONSOLE +// TEARDOWN + +////////////////////////// + +[source,js] +-------------------------------------------------- +POST //_ccr/forget_follower +{ + "follower_cluster" : "", + "follower_index" : "", + "follower_index_uuid" : "", + "leader_remote_cluster" : "" +} +-------------------------------------------------- +// CONSOLE +// TEST[s//leader_index/] +// TEST[s//follower_cluster/] +// TEST[s//follower_index/] +// TEST[s//follower_index_uuid/] +// TEST[s//leader_remote_cluster/] +// TEST[skip_shard_failures] + +[source,js] +-------------------------------------------------- +{ + "_shards" : { + "total" : 1, + "successful" : 1, + "failed" : 0, + "failures" : [ ] + } +} +-------------------------------------------------- +// TESTRESPONSE[s/"total" : 1/"total" : $body._shards.total/] +// TESTRESPONSE[s/"successful" : 1/"successful" : $body._shards.successful/] +// TESTRESPONSE[s/"failed" : 0/"failed" : $body._shards.failed/] +// TESTRESPONSE[s/"failures" : \[ \]/"failures" : $body._shards.failures/] + +==== Path Parameters + +`leader_index` (required):: + (string) the name of the leader index + +==== Request Body +`follower_cluster` (required):: + (string) the name of the cluster containing the follower index + +`follower_index` (required):: + (string) the name of the follower index + +`follower_index_uuid` (required):: + (string) the UUID of the follower index + +`leader_remote_cluster` (required):: + (string) the alias (from the perspective of the cluster containing the + follower index) of the <> containing + the leader index + +==== Authorization + +If the {es} {security-features} are enabled, you must have `manage_leader_index` +index privileges for the leader index. For more information, see +{stack-ov}/security-privileges.html[Security privileges]. + +==== Example + +This example removes the follower retention leases for `follower_index` from +`leader_index`. + +[source,js] +-------------------------------------------------- +POST /leader_index/_ccr/forget_follower +{ + "follower_cluster" : "", + "follower_index" : "follower_index", + "follower_index_uuid" : "", + "leader_remote_cluster" : "" +} +-------------------------------------------------- +// CONSOLE +// TEST[skip_shard_failures] + +The API returns the following result: + +[source,js] +-------------------------------------------------- +{ + "_shards" : { + "total" : 1, + "successful" : 1, + "failed" : 0, + "failures" : [ ] + } +} +-------------------------------------------------- +// TESTRESPONSE[s/"total" : 1/"total" : $body._shards.total/] +// TESTRESPONSE[s/"successful" : 1/"successful" : $body._shards.successful/] +// TESTRESPONSE[s/"failed" : 0/"failed" : $body._shards.failed/] +// TESTRESPONSE[s/"failures" : \[ \]/"failures" : $body._shards.failures/] diff --git a/server/src/main/java/org/elasticsearch/action/support/DefaultShardOperationFailedException.java b/server/src/main/java/org/elasticsearch/action/support/DefaultShardOperationFailedException.java index d297df478a4b8..85d8a2c1a38db 100644 --- a/server/src/main/java/org/elasticsearch/action/support/DefaultShardOperationFailedException.java +++ b/server/src/main/java/org/elasticsearch/action/support/DefaultShardOperationFailedException.java @@ -41,7 +41,7 @@ public class DefaultShardOperationFailedException extends ShardOperationFailedEx private static final String SHARD_ID = "shard"; private static final String REASON = "reason"; - private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "failures", true, arg -> new DefaultShardOperationFailedException((String) arg[0], (int) arg[1] ,(Throwable) arg[2])); static { diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/forget_follower.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/forget_follower.yml new file mode 100644 index 0000000000000..08475a0026aef --- /dev/null +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/forget_follower.yml @@ -0,0 +1,80 @@ +--- +"Test forget follower": + - do: + cluster.state: {} + + - set: {master_node: master} + + - do: + nodes.info: {} + + - set: {nodes.$master.transport_address: local_ip} + + - do: + cluster.put_settings: + body: + transient: + cluster.remote.remote_cluster.seeds: $local_ip + flat_settings: true + + - match: {transient: {cluster.remote.remote_cluster.seeds: $local_ip}} + + - do: + indices.create: + index: leader_index + body: + settings: + index: + number_of_shards: 1 + soft_deletes: + enabled: true + - is_true: acknowledged + + - do: + ccr.follow: + index: follower_index + wait_for_active_shards: 1 + body: + remote_cluster: remote_cluster + leader_index: leader_index + - is_true: follow_index_created + - is_true: follow_index_shards_acked + - is_true: index_following_started + + - do: + info: {} + + - set: {cluster_name: cluster_name} + + - do: + indices.stats: {index: follower_index} + + - set: {indices.follower_index.uuid: follower_index_uuid} + + - do: + ccr.forget_follower: + index: leader_index + body: + follower_cluster: $cluster_name + follower_index: follower_index + follower_index_uuid: $follower_index_uuid + leader_remote_cluster: remote_cluster + - match: { _shards.total: 1 } + - match: { _shards.successful: 1} + - match: { _shards.failed: 0} + - is_false: _shards.failure + + - do: + ccr.pause_follow: + index: follower_index + - is_true: acknowledged + + - do: + indices.close: + index: follower_index + - is_true: acknowledged + + - do: + ccr.unfollow: + index: follower_index + - is_true: acknowledged diff --git a/x-pack/plugin/ccr/qa/security/build.gradle b/x-pack/plugin/ccr/qa/security/build.gradle index af4238c20075e..e1a735e0b2668 100644 --- a/x-pack/plugin/ccr/qa/security/build.gradle +++ b/x-pack/plugin/ccr/qa/security/build.gradle @@ -22,7 +22,7 @@ leaderClusterTestCluster { setupCommand 'setupTestAdmin', 'bin/elasticsearch-users', 'useradd', "test_admin", '-p', 'x-pack-test-password', '-r', "superuser" setupCommand 'setupCcrUser', - 'bin/elasticsearch-users', 'useradd', "test_ccr", '-p', 'x-pack-test-password', '-r', "manage_ccr" + 'bin/elasticsearch-users', 'useradd', "test_ccr", '-p', 'x-pack-test-password', '-r', "ccruser" waitCondition = { node, ant -> File tmpFile = new File(node.cwd, 'wait.success') ant.get(src: "http://${node.httpUri()}/_cluster/health?wait_for_nodes=>=${numNodes}&wait_for_status=yellow", diff --git a/x-pack/plugin/ccr/qa/security/follower-roles.yml b/x-pack/plugin/ccr/qa/security/follower-roles.yml index be3e6cf5e1755..4a91c072043bb 100644 --- a/x-pack/plugin/ccr/qa/security/follower-roles.yml +++ b/x-pack/plugin/ccr/qa/security/follower-roles.yml @@ -2,7 +2,7 @@ ccruser: cluster: - manage_ccr indices: - - names: [ 'allowed-index', 'logs-eu-*' ] + - names: [ 'allowed-index', 'forget-follower', 'logs-eu-*' ] privileges: - monitor - read diff --git a/x-pack/plugin/ccr/qa/security/leader-roles.yml b/x-pack/plugin/ccr/qa/security/leader-roles.yml index 99fa62cbe832b..944af38b92ce5 100644 --- a/x-pack/plugin/ccr/qa/security/leader-roles.yml +++ b/x-pack/plugin/ccr/qa/security/leader-roles.yml @@ -2,7 +2,8 @@ ccruser: cluster: - read_ccr indices: - - names: [ 'allowed-index', 'logs-eu-*' ] + - names: [ 'allowed-index', 'forget-leader', 'logs-eu-*' ] privileges: - monitor - read + - manage_leader_index diff --git a/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index 91b94f1c4b57d..cb54248ee3dbc 100644 --- a/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ccr; import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.RestClient; import org.elasticsearch.common.Strings; @@ -13,14 +14,19 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.test.rest.yaml.ObjectPath; +import java.io.IOException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; public class FollowIndexSecurityIT extends ESCCRRestTestCase { @@ -176,4 +182,55 @@ public void testAutoFollowPatterns() throws Exception { pauseFollow(client(), allowedIndex); } + public void testForgetFollower() throws IOException { + final String forgetLeader = "forget-leader"; + final String forgetFollower = "forget-follower"; + if ("leader".equals(targetCluster)) { + logger.info("running against leader cluster"); + final Settings indexSettings = Settings.builder() + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", 1) + .put("index.soft_deletes.enabled", true) + .build(); + createIndex(forgetLeader, indexSettings); + } else { + logger.info("running against follower cluster"); + followIndex(client(), "leader_cluster", forgetLeader, forgetFollower); + + final Response response = client().performRequest(new Request("GET", "/" + forgetFollower + "/_stats")); + final String followerIndexUUID = ObjectPath.createFromResponse(response).evaluate("indices." + forgetFollower + ".uuid"); + + assertOK(client().performRequest(new Request("POST", "/" + forgetFollower + "/_ccr/pause_follow"))); + + try (RestClient leaderClient = buildLeaderClient(restClientSettings())) { + final Request request = new Request("POST", "/" + forgetLeader + "/_ccr/forget_follower"); + final String requestBody = "{" + + "\"follower_cluster\":\"follow-cluster\"," + + "\"follower_index\":\"" + forgetFollower + "\"," + + "\"follower_index_uuid\":\"" + followerIndexUUID + "\"," + + "\"leader_remote_cluster\":\"leader_cluster\"" + + "}"; + request.setJsonEntity(requestBody); + final Response forgetFollowerResponse = leaderClient.performRequest(request); + assertOK(forgetFollowerResponse); + final Map shards = ObjectPath.createFromResponse(forgetFollowerResponse).evaluate("_shards"); + assertNull(shards.get("failures")); + assertThat(shards.get("total"), equalTo(1)); + assertThat(shards.get("successful"), equalTo(1)); + assertThat(shards.get("failed"), equalTo(0)); + + final Request retentionLeasesRequest = new Request("GET", "/" + forgetLeader + "/_stats"); + retentionLeasesRequest.addParameter("level", "shards"); + final Response retentionLeasesResponse = leaderClient.performRequest(retentionLeasesRequest); + final ArrayList shardsStats = + ObjectPath.createFromResponse(retentionLeasesResponse).evaluate("indices." + forgetLeader + ".shards.0"); + assertThat(shardsStats, hasSize(1)); + final Map shardStatsAsMap = (Map) shardsStats.get(0); + final Map retentionLeasesStats = (Map) shardStatsAsMap.get("retention_leases"); + final List leases = (List) retentionLeasesStats.get("leases"); + assertThat(leases, empty()); + } + } + } + } diff --git a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java index 4891c51049b62..3d5c8610a1af4 100644 --- a/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java +++ b/x-pack/plugin/ccr/qa/src/main/java/org/elasticsearch/xpack/ccr/ESCCRRestTestCase.java @@ -255,16 +255,25 @@ protected RestClient buildLeaderClient() throws IOException { return buildClient(System.getProperty("tests.leader_host")); } + protected RestClient buildLeaderClient(final Settings settings) throws IOException { + assert "leader".equals(targetCluster) == false; + return buildClient(System.getProperty("tests.leader_host"), settings); + } + protected RestClient buildMiddleClient() throws IOException { assert "middle".equals(targetCluster) == false; return buildClient(System.getProperty("tests.middle_host")); } private RestClient buildClient(final String url) throws IOException { + return buildClient(url, restAdminSettings()); + } + + private RestClient buildClient(final String url, final Settings settings) throws IOException { int portSeparator = url.lastIndexOf(':'); HttpHost httpHost = new HttpHost(url.substring(0, portSeparator), - Integer.parseInt(url.substring(portSeparator + 1)), getProtocol()); - return buildClient(restAdminSettings(), new HttpHost[]{httpHost}); + Integer.parseInt(url.substring(portSeparator + 1)), getProtocol()); + return buildClient(settings, new HttpHost[]{httpHost}); } } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index 0e6854652aa16..3526bce13c593 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -55,6 +55,7 @@ import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportFollowInfoAction; import org.elasticsearch.xpack.ccr.action.TransportFollowStatsAction; +import org.elasticsearch.xpack.ccr.action.TransportForgetFollowerAction; import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction; import org.elasticsearch.xpack.ccr.action.TransportPutAutoFollowPatternAction; @@ -75,6 +76,7 @@ import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestFollowInfoAction; import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction; +import org.elasticsearch.xpack.ccr.rest.RestForgetFollowerAction; import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction; import org.elasticsearch.xpack.ccr.rest.RestPutAutoFollowPatternAction; @@ -88,6 +90,7 @@ import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; +import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction; import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; @@ -223,7 +226,9 @@ public List> getPersistentTasksExecutor(ClusterServic // auto-follow actions new ActionHandler<>(DeleteAutoFollowPatternAction.INSTANCE, TransportDeleteAutoFollowPatternAction.class), new ActionHandler<>(PutAutoFollowPatternAction.INSTANCE, TransportPutAutoFollowPatternAction.class), - new ActionHandler<>(GetAutoFollowPatternAction.INSTANCE, TransportGetAutoFollowPatternAction.class)); + new ActionHandler<>(GetAutoFollowPatternAction.INSTANCE, TransportGetAutoFollowPatternAction.class), + // forget follower action + new ActionHandler<>(ForgetFollowerAction.INSTANCE, TransportForgetFollowerAction.class)); } public List getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings, @@ -247,7 +252,9 @@ public List getRestHandlers(Settings settings, RestController restC // auto-follow APIs new RestDeleteAutoFollowPatternAction(settings, restController), new RestPutAutoFollowPatternAction(settings, restController), - new RestGetAutoFollowPatternAction(settings, restController)); + new RestGetAutoFollowPatternAction(settings, restController), + // forget follower API + new RestForgetFollowerAction(settings, restController)); } public List getNamedWriteables() { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java new file mode 100644 index 0000000000000..5656450f4c696 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportForgetFollowerAction.java @@ -0,0 +1,151 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.Assertions; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.action.support.broadcast.node.TransportBroadcastByNodeAction; +import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.routing.GroupShardsIterator; +import org.elasticsearch.cluster.routing.PlainShardsIterator; +import org.elasticsearch.cluster.routing.ShardIterator; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardsIterator; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.CcrRetentionLeases; +import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutionException; + +public class TransportForgetFollowerAction extends TransportBroadcastByNodeAction< + ForgetFollowerAction.Request, + BroadcastResponse, + TransportBroadcastByNodeAction.EmptyResult> { + + private final ClusterService clusterService; + private final IndicesService indicesService; + + @Inject + public TransportForgetFollowerAction( + final ClusterService clusterService, + final TransportService transportService, + final ActionFilters actionFilters, + final IndexNameExpressionResolver indexNameExpressionResolver, + final IndicesService indicesService) { + super( + ForgetFollowerAction.NAME, + Objects.requireNonNull(clusterService), + Objects.requireNonNull(transportService), + Objects.requireNonNull(actionFilters), + Objects.requireNonNull(indexNameExpressionResolver), + ForgetFollowerAction.Request::new, + ThreadPool.Names.MANAGEMENT); + this.clusterService = clusterService; + this.indicesService = Objects.requireNonNull(indicesService); + } + + @Override + protected EmptyResult readShardResult(final StreamInput in) { + return EmptyResult.readEmptyResultFrom(in); + } + + @Override + protected BroadcastResponse newResponse( + final ForgetFollowerAction.Request request, + final int totalShards, + final int successfulShards, + final int failedShards, List emptyResults, + final List shardFailures, + final ClusterState clusterState) { + return new BroadcastResponse(totalShards, successfulShards, failedShards, shardFailures); + } + + @Override + protected ForgetFollowerAction.Request readRequestFrom(final StreamInput in) throws IOException { + return new ForgetFollowerAction.Request(in); + } + + @Override + protected EmptyResult shardOperation(final ForgetFollowerAction.Request request, final ShardRouting shardRouting) { + final Index followerIndex = new Index(request.followerIndex(), request.followerIndexUUID()); + final Index leaderIndex = clusterService.state().metaData().index(request.leaderIndex()).getIndex(); + final String id = CcrRetentionLeases.retentionLeaseId( + request.followerCluster(), + followerIndex, + request.leaderRemoteCluster(), + leaderIndex); + + final IndexShard indexShard = indicesService.indexServiceSafe(leaderIndex).getShard(shardRouting.shardId().id()); + + final PlainActionFuture permit = new PlainActionFuture<>(); + indexShard.acquirePrimaryOperationPermit(permit, ThreadPool.Names.SAME, request); + try (Releasable ignored = permit.get()) { + final PlainActionFuture future = new PlainActionFuture<>(); + indexShard.removeRetentionLease(id, future); + future.get(); + } catch (final ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + + return EmptyResult.INSTANCE; + } + + @Override + protected ShardsIterator shards( + final ClusterState clusterState, + final ForgetFollowerAction.Request request, + final String[] concreteIndices) { + final GroupShardsIterator activePrimaryShards = + clusterState.routingTable().activePrimaryShardsGrouped(concreteIndices, false); + final List shardRoutings = new ArrayList<>(); + final Iterator it = activePrimaryShards.iterator(); + while (it.hasNext()) { + final ShardIterator shardIterator = it.next(); + final ShardRouting primaryShard = shardIterator.nextOrNull(); + assert primaryShard != null; + shardRoutings.add(primaryShard); + if (Assertions.ENABLED) { + final ShardRouting maybeNextPrimaryShard = shardIterator.nextOrNull(); + assert maybeNextPrimaryShard == null : maybeNextPrimaryShard; + } + } + return new PlainShardsIterator(shardRoutings); + } + + @Override + protected ClusterBlockException checkGlobalBlock(final ClusterState state, final ForgetFollowerAction.Request request) { + return null; + } + + @Override + protected ClusterBlockException checkRequestBlock( + final ClusterState state, + final ForgetFollowerAction.Request request, + final String[] concreteIndices) { + return null; + } + +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestForgetFollowerAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestForgetFollowerAction.java new file mode 100644 index 0000000000000..dc39aea372d81 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestForgetFollowerAction.java @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.BytesRestResponse; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction; + +import java.io.IOException; +import java.util.Objects; + +public class RestForgetFollowerAction extends BaseRestHandler { + + public RestForgetFollowerAction(final Settings settings, final RestController restController) { + super(Objects.requireNonNull(settings)); + Objects.requireNonNull(restController); + restController.registerHandler(RestRequest.Method.POST, "/{index}/_ccr/forget_follower", this); + } + + @Override + public String getName() { + return "forget_follower_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) { + final String leaderIndex = restRequest.param("index"); + + return channel -> { + try (XContentParser parser = restRequest.contentOrSourceParamParser()) { + final ForgetFollowerAction.Request request = ForgetFollowerAction.Request.fromXContent(parser, leaderIndex); + client.execute(ForgetFollowerAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } catch (final IOException e) { + channel.sendResponse(new BytesRestResponse(channel, RestStatus.INTERNAL_SERVER_ERROR, e)); + } + }; + + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java index 44b38ddbadfa0..5a93291dacd1b 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrRetentionLeaseIT.java @@ -19,6 +19,7 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.ShardRouting; @@ -53,6 +54,7 @@ import org.elasticsearch.xpack.CcrIntegTestCase; import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction; import org.elasticsearch.xpack.ccr.repository.CcrRepository; +import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction; import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; @@ -80,6 +82,7 @@ import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.emptyArray; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasSize; @@ -918,6 +921,79 @@ public void onResponseReceived( } } + public void testForgetFollower() throws Exception { + final String leaderIndex = "leader"; + final String followerIndex = "follower"; + final int numberOfShards = randomIntBetween(1, 4); + final String leaderIndexSettings = + getIndexSettings(numberOfShards, 0, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); + assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON).get()); + ensureLeaderYellow(leaderIndex); + final PutFollowAction.Request followRequest = putFollow(leaderIndex, followerIndex); + followerClient().execute(PutFollowAction.INSTANCE, followRequest).get(); + + ensureFollowerGreen(true, followerIndex); + + pauseFollow(followerIndex); + followerClient().admin().indices().close(new CloseIndexRequest(followerIndex)).actionGet(); + + final ClusterStateResponse followerClusterState = followerClient().admin().cluster().prepareState().clear().setNodes(true).get(); + try { + for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getNodes().values()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + senderTransportService.addSendBehavior( + (connection, requestId, action, request, options) -> { + if (RetentionLeaseActions.Remove.ACTION_NAME.equals(action) + || TransportActionProxy.getProxyAction(RetentionLeaseActions.Remove.ACTION_NAME).equals(action)) { + final RetentionLeaseActions.RemoveRequest removeRequest = (RetentionLeaseActions.RemoveRequest) request; + if (randomBoolean()) { + throw new ConnectTransportException(connection.getNode(), "connection failed"); + } else { + throw new IndexShardClosedException(removeRequest.getShardId()); + } + } + connection.sendRequest(requestId, action, request, options); + }); + } + + expectThrows( + ElasticsearchException.class, + () -> followerClient().execute(UnfollowAction.INSTANCE, new UnfollowAction.Request(followerIndex)).actionGet()); + + final ClusterStateResponse followerIndexClusterState = + followerClient().admin().cluster().prepareState().clear().setMetaData(true).setIndices(followerIndex).get(); + final String followerUUID = followerIndexClusterState.getState().metaData().index(followerIndex).getIndexUUID(); + + final BroadcastResponse forgetFollowerResponse = leaderClient().execute( + ForgetFollowerAction.INSTANCE, + new ForgetFollowerAction.Request( + getFollowerCluster().getClusterName(), + followerIndex, + followerUUID, + "leader_cluster", + leaderIndex)).actionGet(); + + assertThat(forgetFollowerResponse.getTotalShards(), equalTo(numberOfShards)); + assertThat(forgetFollowerResponse.getSuccessfulShards(), equalTo(numberOfShards)); + assertThat(forgetFollowerResponse.getFailedShards(), equalTo(0)); + assertThat(forgetFollowerResponse.getShardFailures(), emptyArray()); + + final IndicesStatsResponse afterForgetFollowerStats = + leaderClient().admin().indices().stats(new IndicesStatsRequest().clear().indices(leaderIndex)).actionGet(); + final List afterForgetFollowerShardsStats = getShardsStats(afterForgetFollowerStats); + for (final ShardStats shardStats : afterForgetFollowerShardsStats) { + assertThat(shardStats.getRetentionLeaseStats().retentionLeases().leases(), empty()); + } + } finally { + for (final ObjectCursor senderNode : followerClusterState.getState().nodes().getDataNodes().values()) { + final MockTransportService senderTransportService = + (MockTransportService) getFollowerCluster().getInstance(TransportService.class, senderNode.value.getName()); + senderTransportService.clearAllRules(); + } + } + } + private void assertRetentionLeaseRenewal( final int numberOfShards, final int numberOfReplicas, diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ForgetFollowerAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ForgetFollowerAction.java new file mode 100644 index 0000000000000..d2a0b565496d6 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/ForgetFollowerAction.java @@ -0,0 +1,171 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ccr.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.broadcast.BroadcastRequest; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Objects; + +public class ForgetFollowerAction extends Action { + + public static final String NAME = "indices:admin/xpack/ccr/forget_follower"; + public static final ForgetFollowerAction INSTANCE = new ForgetFollowerAction(); + + private ForgetFollowerAction() { + super(NAME); + } + + @Override + public BroadcastResponse newResponse() { + return new BroadcastResponse(); + } + + /** + * Represents a forget follower request. Note that this an expert API intended to be used only when unfollowing a follower index fails + * to emove the follower retention leases. Please be sure that you understand the purpose this API before using. + */ + public static class Request extends BroadcastRequest { + + private static final ParseField FOLLOWER_CLUSTER = new ParseField("follower_cluster"); + private static final ParseField FOLLOWER_INDEX = new ParseField("follower_index"); + private static final ParseField FOLLOWER_INDEX_UUID = new ParseField("follower_index_uuid"); + private static final ParseField LEADER_REMOTE_CLUSTER = new ParseField("leader_remote_cluster"); + + private static final ObjectParser PARSER = new ObjectParser<>(NAME, () -> new String[4]); + + static { + PARSER.declareString((parameters, value) -> parameters[0] = value, FOLLOWER_CLUSTER); + PARSER.declareString((parameters, value) -> parameters[1] = value, FOLLOWER_INDEX); + PARSER.declareString((parameters, value) -> parameters[2] = value, FOLLOWER_INDEX_UUID); + PARSER.declareString((parameters, value) -> parameters[3] = value, LEADER_REMOTE_CLUSTER); + } + + public static ForgetFollowerAction.Request fromXContent( + final XContentParser parser, + final String leaderIndex) throws IOException { + final String[] parameters = PARSER.parse(parser, null); + return new Request(parameters[0], parameters[1], parameters[2], parameters[3], leaderIndex); + } + + private String followerCluster; + + /** + * The name of the cluster containing the follower index. + * + * @return the name of the cluster containing the follower index + */ + public String followerCluster() { + return followerCluster; + } + + private String followerIndex; + + /** + * The name of the follower index. + * + * @return the name of the follower index + */ + public String followerIndex() { + return followerIndex; + } + + private String followerIndexUUID; + + /** + * The UUID of the follower index. + * + * @return the UUID of the follower index + */ + public String followerIndexUUID() { + return followerIndexUUID; + } + + private String leaderRemoteCluster; + + /** + * The alias of the remote cluster containing the leader index. + * + * @return the alias of the remote cluster + */ + public String leaderRemoteCluster() { + return leaderRemoteCluster; + } + + private String leaderIndex; + + /** + * The name of the leader index. + * + * @return the name of the leader index + */ + public String leaderIndex() { + return leaderIndex; + } + + public Request() { + + } + + /** + * Construct a forget follower request. + * + * @param followerCluster the name of the cluster containing the follower index to forget + * @param followerIndex the name of follower index + * @param followerIndexUUID the UUID of the follower index + * @param leaderRemoteCluster the alias of the remote cluster containing the leader index from the perspective of the follower index + * @param leaderIndex the name of the leader index + */ + public Request( + final String followerCluster, + final String followerIndex, + final String followerIndexUUID, + final String leaderRemoteCluster, + final String leaderIndex) { + super(new String[]{leaderIndex}); + this.followerCluster = Objects.requireNonNull(followerCluster); + this.leaderIndex = Objects.requireNonNull(leaderIndex); + this.leaderRemoteCluster = Objects.requireNonNull(leaderRemoteCluster); + this.followerIndex = Objects.requireNonNull(followerIndex); + this.followerIndexUUID = Objects.requireNonNull(followerIndexUUID); + } + + public Request(final StreamInput in) throws IOException { + super.readFrom(in); + followerCluster = in.readString(); + leaderIndex = in.readString(); + leaderRemoteCluster = in.readString(); + followerIndex = in.readString(); + followerIndexUUID = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(followerCluster); + out.writeString(leaderIndex); + out.writeString(leaderRemoteCluster); + out.writeString(followerIndex); + out.writeString(followerIndexUUID); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java index f35a14314338c..00b115131d1fb 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/client/CcrClient.java @@ -9,11 +9,13 @@ import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; +import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction; import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; @@ -96,6 +98,16 @@ public ActionFuture unfollow(final UnfollowAction.Request return listener; } + public void forgetFollower(final ForgetFollowerAction.Request request, final ActionListener listener) { + client.execute(ForgetFollowerAction.INSTANCE, request, listener); + } + + public ActionFuture forgetFollower(final ForgetFollowerAction.Request request) { + final PlainActionFuture listener = PlainActionFuture.newFuture(); + client.execute(ForgetFollowerAction.INSTANCE, request, listener); + return listener; + } + public void putAutoFollowPattern( final PutAutoFollowPatternAction.Request request, final ActionListener listener) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java index d24863d6d53c4..e20e76ee47b37 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/privilege/IndexPrivilege.java @@ -22,6 +22,7 @@ import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryAction; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.xpack.core.ccr.action.ForgetFollowerAction; import org.elasticsearch.xpack.core.ccr.action.PutFollowAction; import org.elasticsearch.xpack.core.ccr.action.UnfollowAction; import org.elasticsearch.xpack.core.indexlifecycle.action.ExplainLifecycleAction; @@ -62,6 +63,7 @@ public final class IndexPrivilege extends Privilege { ExplainLifecycleAction.NAME); private static final Automaton MANAGE_FOLLOW_INDEX_AUTOMATON = patterns(PutFollowAction.NAME, UnfollowAction.NAME, CloseIndexAction.NAME + "*"); + private static final Automaton MANAGE_LEADER_INDEX_AUTOMATON = patterns(ForgetFollowerAction.NAME + "*"); private static final Automaton MANAGE_ILM_AUTOMATON = patterns("indices:admin/ilm/*"); public static final IndexPrivilege NONE = new IndexPrivilege("none", Automatons.EMPTY); @@ -78,6 +80,7 @@ public final class IndexPrivilege extends Privilege { public static final IndexPrivilege CREATE_INDEX = new IndexPrivilege("create_index", CREATE_INDEX_AUTOMATON); public static final IndexPrivilege VIEW_METADATA = new IndexPrivilege("view_index_metadata", VIEW_METADATA_AUTOMATON); public static final IndexPrivilege MANAGE_FOLLOW_INDEX = new IndexPrivilege("manage_follow_index", MANAGE_FOLLOW_INDEX_AUTOMATON); + public static final IndexPrivilege MANAGE_LEADER_INDEX = new IndexPrivilege("manage_leader_index", MANAGE_LEADER_INDEX_AUTOMATON); public static final IndexPrivilege MANAGE_ILM = new IndexPrivilege("manage_ilm", MANAGE_ILM_AUTOMATON); private static final Map VALUES = MapBuilder.newMapBuilder() @@ -95,6 +98,7 @@ public final class IndexPrivilege extends Privilege { .put("view_index_metadata", VIEW_METADATA) .put("read_cross_cluster", READ_CROSS_CLUSTER) .put("manage_follow_index", MANAGE_FOLLOW_INDEX) + .put("manage_leader_index", MANAGE_LEADER_INDEX) .put("manage_ilm", MANAGE_ILM) .immutableMap(); diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.forget_follower.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.forget_follower.json new file mode 100644 index 0000000000000..92d38e5e999e8 --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.forget_follower.json @@ -0,0 +1,21 @@ +{ + "ccr.forget_follower": { + "documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/current", + "methods": [ "POST" ], + "url": { + "path": "/{index}/_ccr/forget_follower", + "paths": [ "/{index}/_ccr/forget_follower" ], + "parts": { + "index": { + "type": "string", + "required": true, + "description": "the name of the leader index for which specified follower retention leases should be removed" + } + } + }, + "body": { + "description" : "the name and UUID of the follower index, the name of the cluster containing the follower index, and the alias from the perspective of that cluster for the remote cluster containing the leader index", + "required" : true + } + } +}