From 9ba12488a9a192b610af95792a19ee06ab3fe9e9 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 16 Nov 2018 15:09:23 +0100 Subject: [PATCH 1/2] [HLRC] Added support for CCR Resume Follow API This change also adds documentation for the Resume Follow API Relates to #33824 --- .../org/elasticsearch/client/CcrClient.java | 47 +++- .../client/CcrRequestConverters.java | 11 + .../client/ccr/FollowConfig.java | 203 ++++++++++++++++++ .../client/ccr/PutFollowRequest.java | 161 +------------- .../client/ccr/ResumeFollowRequest.java | 65 ++++++ .../java/org/elasticsearch/client/CCRIT.java | 20 +- .../client/ccr/ResumeFollowRequestTests.java | 116 ++++++++++ .../documentation/CCRDocumentationIT.java | 83 ++++++- .../high-level/ccr/resume_follow.asciidoc | 35 +++ .../high-level/supported-apis.asciidoc | 2 + 10 files changed, 579 insertions(+), 164 deletions(-) create mode 100644 client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/FollowConfig.java create mode 100644 client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/ResumeFollowRequest.java create mode 100644 client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/ResumeFollowRequestTests.java create mode 100644 docs/java-rest/high-level/ccr/resume_follow.asciidoc diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java index 68933093ae794..fb91e16015a45 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrClient.java @@ -23,6 +23,7 @@ import org.elasticsearch.client.ccr.PauseFollowRequest; import org.elasticsearch.client.ccr.PutFollowRequest; import org.elasticsearch.client.ccr.PutFollowResponse; +import org.elasticsearch.client.ccr.ResumeFollowRequest; import org.elasticsearch.client.core.AcknowledgedResponse; import java.io.IOException; @@ -89,7 +90,7 @@ public void putFollowAsync(PutFollowRequest request, } /** - * Instructs a follower index the pause the following of a leader index. + * Instructs a follower index to pause the following of a leader index. * * See * the docs for more. @@ -110,7 +111,7 @@ public AcknowledgedResponse pauseFollow(PauseFollowRequest request, RequestOptio } /** - * Asynchronously instruct a follower index the pause the following of a leader index. + * Asynchronously instruct a follower index to pause the following of a leader index. * * See * the docs for more. @@ -130,4 +131,46 @@ public void pauseFollowAsync(PauseFollowRequest request, Collections.emptySet()); } + /** + * Instructs a follower index to resume the following of a leader index. + * + * See + * the docs for more. + * + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + * @throws IOException in case there is a problem sending the request or parsing back the response + */ + public AcknowledgedResponse resumeFollow(ResumeFollowRequest request, RequestOptions options) throws IOException { + return restHighLevelClient.performRequestAndParseEntity( + request, + CcrRequestConverters::resumeFollow, + options, + AcknowledgedResponse::fromXContent, + Collections.emptySet() + ); + } + + /** + * Asynchronously instruct a follower index to resume the following of a leader index. + * + * See + * the docs for more. + * + * @param request the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + */ + public void resumeFollowAsync(ResumeFollowRequest request, + RequestOptions options, + ActionListener listener) { + restHighLevelClient.performRequestAsyncAndParseEntity( + request, + CcrRequestConverters::resumeFollow, + options, + AcknowledgedResponse::fromXContent, + listener, + Collections.emptySet()); + } + } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java index eee5715d58629..54576b7b8d8d7 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/CcrRequestConverters.java @@ -23,6 +23,7 @@ import org.apache.http.client.methods.HttpPut; import org.elasticsearch.client.ccr.PauseFollowRequest; import org.elasticsearch.client.ccr.PutFollowRequest; +import org.elasticsearch.client.ccr.ResumeFollowRequest; import java.io.IOException; @@ -49,4 +50,14 @@ static Request pauseFollow(PauseFollowRequest pauseFollowRequest) { return new Request(HttpPost.METHOD_NAME, endpoint); } + static Request resumeFollow(ResumeFollowRequest resumeFollowRequest) throws IOException { + String endpoint = new RequestConverters.EndpointBuilder() + .addPathPart(resumeFollowRequest.getFollowerIndex()) + .addPathPartAsIs("_ccr", "resume_follow") + .build(); + Request request = new Request(HttpPost.METHOD_NAME, endpoint); + request.setEntity(createEntity(resumeFollowRequest, REQUEST_BODY_CONTENT_TYPE)); + return request; + } + } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/FollowConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/FollowConfig.java new file mode 100644 index 0000000000000..eb9b5e80767db --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/FollowConfig.java @@ -0,0 +1,203 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.ccr; + +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +public class FollowConfig { + + static final ParseField MAX_READ_REQUEST_OPERATION_COUNT = new ParseField("max_read_request_operation_count"); + static final ParseField MAX_READ_REQUEST_SIZE = new ParseField("max_read_request_size"); + static final ParseField MAX_OUTSTANDING_READ_REQUESTS = new ParseField("max_outstanding_read_requests"); + static final ParseField MAX_WRITE_REQUEST_OPERATION_COUNT = new ParseField("max_write_request_operation_count"); + static final ParseField MAX_WRITE_REQUEST_SIZE = new ParseField("max_write_request_size"); + static final ParseField MAX_OUTSTANDING_WRITE_REQUESTS = new ParseField("max_outstanding_write_requests"); + static final ParseField MAX_WRITE_BUFFER_COUNT = new ParseField("max_write_buffer_count"); + static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); + static final ParseField MAX_RETRY_DELAY_FIELD = new ParseField("max_retry_delay"); + static final ParseField READ_POLL_TIMEOUT = new ParseField("read_poll_timeout"); + + private Integer maxReadRequestOperationCount; + private Integer maxOutstandingReadRequests; + private ByteSizeValue maxReadRequestSize; + private Integer maxWriteRequestOperationCount; + private ByteSizeValue maxWriteRequestSize; + private Integer maxOutstandingWriteRequests; + private Integer maxWriteBufferCount; + private ByteSizeValue maxWriteBufferSize; + private TimeValue maxRetryDelay; + private TimeValue readPollTimeout; + + FollowConfig() { + } + + public Integer getMaxReadRequestOperationCount() { + return maxReadRequestOperationCount; + } + + public void setMaxReadRequestOperationCount(Integer maxReadRequestOperationCount) { + this.maxReadRequestOperationCount = maxReadRequestOperationCount; + } + + public Integer getMaxOutstandingReadRequests() { + return maxOutstandingReadRequests; + } + + public void setMaxOutstandingReadRequests(Integer maxOutstandingReadRequests) { + this.maxOutstandingReadRequests = maxOutstandingReadRequests; + } + + public ByteSizeValue getMaxReadRequestSize() { + return maxReadRequestSize; + } + + public void setMaxReadRequestSize(ByteSizeValue maxReadRequestSize) { + this.maxReadRequestSize = maxReadRequestSize; + } + + public Integer getMaxWriteRequestOperationCount() { + return maxWriteRequestOperationCount; + } + + public void setMaxWriteRequestOperationCount(Integer maxWriteRequestOperationCount) { + this.maxWriteRequestOperationCount = maxWriteRequestOperationCount; + } + + public ByteSizeValue getMaxWriteRequestSize() { + return maxWriteRequestSize; + } + + public void setMaxWriteRequestSize(ByteSizeValue maxWriteRequestSize) { + this.maxWriteRequestSize = maxWriteRequestSize; + } + + public Integer getMaxOutstandingWriteRequests() { + return maxOutstandingWriteRequests; + } + + public void setMaxOutstandingWriteRequests(Integer maxOutstandingWriteRequests) { + this.maxOutstandingWriteRequests = maxOutstandingWriteRequests; + } + + public Integer getMaxWriteBufferCount() { + return maxWriteBufferCount; + } + + public void setMaxWriteBufferCount(Integer maxWriteBufferCount) { + this.maxWriteBufferCount = maxWriteBufferCount; + } + + public ByteSizeValue getMaxWriteBufferSize() { + return maxWriteBufferSize; + } + + public void setMaxWriteBufferSize(ByteSizeValue maxWriteBufferSize) { + this.maxWriteBufferSize = maxWriteBufferSize; + } + + public TimeValue getMaxRetryDelay() { + return maxRetryDelay; + } + + public void setMaxRetryDelay(TimeValue maxRetryDelay) { + this.maxRetryDelay = maxRetryDelay; + } + + public TimeValue getReadPollTimeout() { + return readPollTimeout; + } + + public void setReadPollTimeout(TimeValue readPollTimeout) { + this.readPollTimeout = readPollTimeout; + } + + void toXContentFragment(XContentBuilder builder, ToXContent.Params params) throws IOException { + if (maxReadRequestOperationCount != null) { + builder.field(MAX_READ_REQUEST_OPERATION_COUNT.getPreferredName(), maxReadRequestOperationCount); + } + if (maxReadRequestSize != null) { + builder.field(MAX_READ_REQUEST_SIZE.getPreferredName(), maxReadRequestSize.getStringRep()); + } + if (maxWriteRequestOperationCount != null) { + builder.field(MAX_WRITE_REQUEST_OPERATION_COUNT.getPreferredName(), maxWriteRequestOperationCount); + } + if (maxWriteRequestSize != null) { + builder.field(MAX_WRITE_REQUEST_SIZE.getPreferredName(), maxWriteRequestSize.getStringRep()); + } + if (maxWriteBufferCount != null) { + builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount); + } + if (maxWriteBufferSize != null) { + builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep()); + } + if (maxOutstandingReadRequests != null) { + builder.field(MAX_OUTSTANDING_READ_REQUESTS.getPreferredName(), maxOutstandingReadRequests); + } + if (maxOutstandingWriteRequests != null) { + builder.field(MAX_OUTSTANDING_WRITE_REQUESTS.getPreferredName(), maxOutstandingWriteRequests); + } + if (maxRetryDelay != null) { + builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), maxRetryDelay.getStringRep()); + } + if (readPollTimeout != null) { + builder.field(READ_POLL_TIMEOUT.getPreferredName(), readPollTimeout.getStringRep()); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FollowConfig that = (FollowConfig) o; + return Objects.equals(maxReadRequestOperationCount, that.maxReadRequestOperationCount) && + Objects.equals(maxOutstandingReadRequests, that.maxOutstandingReadRequests) && + Objects.equals(maxReadRequestSize, that.maxReadRequestSize) && + Objects.equals(maxWriteRequestOperationCount, that.maxWriteRequestOperationCount) && + Objects.equals(maxWriteRequestSize, that.maxWriteRequestSize) && + Objects.equals(maxOutstandingWriteRequests, that.maxOutstandingWriteRequests) && + Objects.equals(maxWriteBufferCount, that.maxWriteBufferCount) && + Objects.equals(maxWriteBufferSize, that.maxWriteBufferSize) && + Objects.equals(maxRetryDelay, that.maxRetryDelay) && + Objects.equals(readPollTimeout, that.readPollTimeout); + } + + @Override + public int hashCode() { + return Objects.hash( + maxReadRequestOperationCount, + maxOutstandingReadRequests, + maxReadRequestSize, + maxWriteRequestOperationCount, + maxWriteRequestSize, + maxOutstandingWriteRequests, + maxWriteBufferCount, + maxWriteBufferSize, + maxRetryDelay, + readPollTimeout + ); + } +} diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/PutFollowRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/PutFollowRequest.java index f3ea0ae2e9bfe..98e9d224564cf 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/PutFollowRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/PutFollowRequest.java @@ -21,43 +21,21 @@ import org.elasticsearch.client.Validatable; import org.elasticsearch.common.ParseField; -import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; import java.util.Objects; -public final class PutFollowRequest implements Validatable, ToXContentObject { +public final class PutFollowRequest extends FollowConfig implements Validatable, ToXContentObject { static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster"); static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index"); static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index"); - static final ParseField MAX_READ_REQUEST_OPERATION_COUNT = new ParseField("max_read_request_operation_count"); - static final ParseField MAX_READ_REQUEST_SIZE = new ParseField("max_read_request_size"); - static final ParseField MAX_OUTSTANDING_READ_REQUESTS = new ParseField("max_outstanding_read_requests"); - static final ParseField MAX_WRITE_REQUEST_OPERATION_COUNT = new ParseField("max_write_request_operation_count"); - static final ParseField MAX_WRITE_REQUEST_SIZE = new ParseField("max_write_request_size"); - static final ParseField MAX_OUTSTANDING_WRITE_REQUESTS = new ParseField("max_outstanding_write_requests"); - static final ParseField MAX_WRITE_BUFFER_COUNT = new ParseField("max_write_buffer_count"); - static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); - static final ParseField MAX_RETRY_DELAY_FIELD = new ParseField("max_retry_delay"); - static final ParseField READ_POLL_TIMEOUT = new ParseField("read_poll_timeout"); private final String remoteCluster; private final String leaderIndex; private final String followerIndex; - private Integer maxReadRequestOperationCount; - private Integer maxOutstandingReadRequests; - private ByteSizeValue maxReadRequestSize; - private Integer maxWriteRequestOperationCount; - private ByteSizeValue maxWriteRequestSize; - private Integer maxOutstandingWriteRequests; - private Integer maxWriteBufferCount; - private ByteSizeValue maxWriteBufferSize; - private TimeValue maxRetryDelay; - private TimeValue readPollTimeout; public PutFollowRequest(String remoteCluster, String leaderIndex, String followerIndex) { this.remoteCluster = Objects.requireNonNull(remoteCluster, "remoteCluster"); @@ -71,36 +49,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster); builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex); builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex); - if (maxReadRequestOperationCount != null) { - builder.field(MAX_READ_REQUEST_OPERATION_COUNT.getPreferredName(), maxReadRequestOperationCount); - } - if (maxReadRequestSize != null) { - builder.field(MAX_READ_REQUEST_SIZE.getPreferredName(), maxReadRequestSize.getStringRep()); - } - if (maxWriteRequestOperationCount != null) { - builder.field(MAX_WRITE_REQUEST_OPERATION_COUNT.getPreferredName(), maxWriteRequestOperationCount); - } - if (maxWriteRequestSize != null) { - builder.field(MAX_WRITE_REQUEST_SIZE.getPreferredName(), maxWriteRequestSize.getStringRep()); - } - if (maxWriteBufferCount != null) { - builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), maxWriteBufferCount); - } - if (maxWriteBufferSize != null) { - builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize.getStringRep()); - } - if (maxOutstandingReadRequests != null) { - builder.field(MAX_OUTSTANDING_READ_REQUESTS.getPreferredName(), maxOutstandingReadRequests); - } - if (maxOutstandingWriteRequests != null) { - builder.field(MAX_OUTSTANDING_WRITE_REQUESTS.getPreferredName(), maxOutstandingWriteRequests); - } - if (maxRetryDelay != null) { - builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), maxRetryDelay.getStringRep()); - } - if (readPollTimeout != null) { - builder.field(READ_POLL_TIMEOUT.getPreferredName(), readPollTimeout.getStringRep()); - } + toXContentFragment(builder, params); builder.endObject(); return builder; } @@ -117,122 +66,24 @@ public String getFollowerIndex() { return followerIndex; } - public Integer getMaxReadRequestOperationCount() { - return maxReadRequestOperationCount; - } - - public void setMaxReadRequestOperationCount(Integer maxReadRequestOperationCount) { - this.maxReadRequestOperationCount = maxReadRequestOperationCount; - } - - public Integer getMaxOutstandingReadRequests() { - return maxOutstandingReadRequests; - } - - public void setMaxOutstandingReadRequests(Integer maxOutstandingReadRequests) { - this.maxOutstandingReadRequests = maxOutstandingReadRequests; - } - - public ByteSizeValue getMaxReadRequestSize() { - return maxReadRequestSize; - } - - public void setMaxReadRequestSize(ByteSizeValue maxReadRequestSize) { - this.maxReadRequestSize = maxReadRequestSize; - } - - public Integer getMaxWriteRequestOperationCount() { - return maxWriteRequestOperationCount; - } - - public void setMaxWriteRequestOperationCount(Integer maxWriteRequestOperationCount) { - this.maxWriteRequestOperationCount = maxWriteRequestOperationCount; - } - - public ByteSizeValue getMaxWriteRequestSize() { - return maxWriteRequestSize; - } - - public void setMaxWriteRequestSize(ByteSizeValue maxWriteRequestSize) { - this.maxWriteRequestSize = maxWriteRequestSize; - } - - public Integer getMaxOutstandingWriteRequests() { - return maxOutstandingWriteRequests; - } - - public void setMaxOutstandingWriteRequests(Integer maxOutstandingWriteRequests) { - this.maxOutstandingWriteRequests = maxOutstandingWriteRequests; - } - - public Integer getMaxWriteBufferCount() { - return maxWriteBufferCount; - } - - public void setMaxWriteBufferCount(Integer maxWriteBufferCount) { - this.maxWriteBufferCount = maxWriteBufferCount; - } - - public ByteSizeValue getMaxWriteBufferSize() { - return maxWriteBufferSize; - } - - public void setMaxWriteBufferSize(ByteSizeValue maxWriteBufferSize) { - this.maxWriteBufferSize = maxWriteBufferSize; - } - - public TimeValue getMaxRetryDelay() { - return maxRetryDelay; - } - - public void setMaxRetryDelay(TimeValue maxRetryDelay) { - this.maxRetryDelay = maxRetryDelay; - } - - public TimeValue getReadPollTimeout() { - return readPollTimeout; - } - - public void setReadPollTimeout(TimeValue readPollTimeout) { - this.readPollTimeout = readPollTimeout; - } - @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; PutFollowRequest that = (PutFollowRequest) o; return Objects.equals(remoteCluster, that.remoteCluster) && Objects.equals(leaderIndex, that.leaderIndex) && - Objects.equals(followerIndex, that.followerIndex) && - Objects.equals(maxReadRequestOperationCount, that.maxReadRequestOperationCount) && - Objects.equals(maxOutstandingReadRequests, that.maxOutstandingReadRequests) && - Objects.equals(maxReadRequestSize, that.maxReadRequestSize) && - Objects.equals(maxWriteRequestOperationCount, that.maxWriteRequestOperationCount) && - Objects.equals(maxWriteRequestSize, that.maxWriteRequestSize) && - Objects.equals(maxOutstandingWriteRequests, that.maxOutstandingWriteRequests) && - Objects.equals(maxWriteBufferCount, that.maxWriteBufferCount) && - Objects.equals(maxWriteBufferSize, that.maxWriteBufferSize) && - Objects.equals(maxRetryDelay, that.maxRetryDelay) && - Objects.equals(readPollTimeout, that.readPollTimeout); + Objects.equals(followerIndex, that.followerIndex); } @Override public int hashCode() { return Objects.hash( + super.hashCode(), remoteCluster, leaderIndex, - followerIndex, - maxReadRequestOperationCount, - maxOutstandingReadRequests, - maxReadRequestSize, - maxWriteRequestOperationCount, - maxWriteRequestSize, - maxOutstandingWriteRequests, - maxWriteBufferCount, - maxWriteBufferSize, - maxRetryDelay, - readPollTimeout + followerIndex ); } } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/ResumeFollowRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/ResumeFollowRequest.java new file mode 100644 index 0000000000000..d9ceb666afd2f --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ccr/ResumeFollowRequest.java @@ -0,0 +1,65 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.ccr; + +import org.elasticsearch.client.Validatable; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.client.ccr.PutFollowRequest.FOLLOWER_INDEX_FIELD; + +public final class ResumeFollowRequest extends FollowConfig implements Validatable, ToXContentObject { + + private final String followerIndex; + + public ResumeFollowRequest(String followerIndex) { + this.followerIndex = Objects.requireNonNull(followerIndex, "followerIndex"); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex); + toXContentFragment(builder, params); + builder.endObject(); + return builder; + } + + public String getFollowerIndex() { + return followerIndex; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + if (!super.equals(o)) return false; + ResumeFollowRequest that = (ResumeFollowRequest) o; + return Objects.equals(followerIndex, that.followerIndex); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), followerIndex); + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java index f98cbbb2b85c0..9b0e9a4d312c8 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/CCRIT.java @@ -31,6 +31,7 @@ import org.elasticsearch.client.ccr.PauseFollowRequest; import org.elasticsearch.client.ccr.PutFollowRequest; import org.elasticsearch.client.ccr.PutFollowResponse; +import org.elasticsearch.client.ccr.ResumeFollowRequest; import org.elasticsearch.client.core.AcknowledgedResponse; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; @@ -63,7 +64,7 @@ public void setupRemoteClusterConfig() throws IOException { assertThat(updateSettingsResponse.isAcknowledged(), is(true)); } - public void testCCR() throws Exception { + public void testIndexFollowing() throws Exception { CcrClient ccrClient = highLevelClient().ccr(); CreateIndexRequest createIndexRequest = new CreateIndexRequest("leader"); @@ -95,6 +96,23 @@ public void testCCR() throws Exception { PauseFollowRequest pauseFollowRequest = new PauseFollowRequest("follower"); AcknowledgedResponse pauseFollowResponse = execute(pauseFollowRequest, ccrClient::pauseFollow, ccrClient::pauseFollowAsync); assertThat(pauseFollowResponse.isAcknowledged(), is(true)); + + highLevelClient().index(indexRequest, RequestOptions.DEFAULT); + + ResumeFollowRequest resumeFollowRequest = new ResumeFollowRequest("follower"); + AcknowledgedResponse resumeFollowResponse = execute(resumeFollowRequest, ccrClient::resumeFollow, ccrClient::resumeFollowAsync); + assertThat(resumeFollowResponse.isAcknowledged(), is(true)); + + assertBusy(() -> { + SearchRequest followerSearchRequest = new SearchRequest("follower"); + SearchResponse followerSearchResponse = highLevelClient().search(followerSearchRequest, RequestOptions.DEFAULT); + assertThat(followerSearchResponse.getHits().getTotalHits(), equalTo(2L)); + }); + + // To avoid keeping shard follow tasks running after this test has finished: + pauseFollowRequest = new PauseFollowRequest("follower"); + pauseFollowResponse = execute(pauseFollowRequest, ccrClient::pauseFollow, ccrClient::pauseFollowAsync); + assertThat(pauseFollowResponse.isAcknowledged(), is(true)); } private static Map toMap(Response response) throws IOException { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/ResumeFollowRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/ResumeFollowRequestTests.java new file mode 100644 index 0000000000000..f68148c63180c --- /dev/null +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/ResumeFollowRequestTests.java @@ -0,0 +1,116 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.ccr; + +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractXContentTestCase; + +import java.io.IOException; + +public class ResumeFollowRequestTests extends AbstractXContentTestCase { + + private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("test_parser", + (args) -> new ResumeFollowRequest((String) args[0])); + + static { + PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.FOLLOWER_INDEX_FIELD); + PARSER.declareInt(ResumeFollowRequest::setMaxReadRequestOperationCount, FollowConfig.MAX_READ_REQUEST_OPERATION_COUNT); + PARSER.declareField( + ResumeFollowRequest::setMaxReadRequestSize, + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), FollowConfig.MAX_READ_REQUEST_SIZE.getPreferredName()), + PutFollowRequest.MAX_READ_REQUEST_SIZE, + ObjectParser.ValueType.STRING); + PARSER.declareInt(ResumeFollowRequest::setMaxOutstandingReadRequests, FollowConfig.MAX_OUTSTANDING_READ_REQUESTS); + PARSER.declareInt(ResumeFollowRequest::setMaxWriteRequestOperationCount, FollowConfig.MAX_WRITE_REQUEST_OPERATION_COUNT); + PARSER.declareField( + ResumeFollowRequest::setMaxWriteRequestSize, + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), FollowConfig.MAX_WRITE_REQUEST_SIZE.getPreferredName()), + PutFollowRequest.MAX_WRITE_REQUEST_SIZE, + ObjectParser.ValueType.STRING); + PARSER.declareInt(ResumeFollowRequest::setMaxOutstandingWriteRequests, FollowConfig.MAX_OUTSTANDING_WRITE_REQUESTS); + PARSER.declareInt(ResumeFollowRequest::setMaxWriteBufferCount, FollowConfig.MAX_WRITE_BUFFER_COUNT); + PARSER.declareField( + ResumeFollowRequest::setMaxWriteBufferSize, + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), FollowConfig.MAX_WRITE_BUFFER_SIZE.getPreferredName()), + PutFollowRequest.MAX_WRITE_BUFFER_SIZE, + ObjectParser.ValueType.STRING); + PARSER.declareField( + ResumeFollowRequest::setMaxRetryDelay, + (p, c) -> TimeValue.parseTimeValue(p.text(), FollowConfig.MAX_RETRY_DELAY_FIELD.getPreferredName()), + PutFollowRequest.MAX_RETRY_DELAY_FIELD, + ObjectParser.ValueType.STRING); + PARSER.declareField( + ResumeFollowRequest::setReadPollTimeout, + (p, c) -> TimeValue.parseTimeValue(p.text(), FollowConfig.READ_POLL_TIMEOUT.getPreferredName()), + PutFollowRequest.READ_POLL_TIMEOUT, + ObjectParser.ValueType.STRING); + } + + @Override + protected ResumeFollowRequest doParseInstance(XContentParser parser) throws IOException { + return PARSER.apply(parser, null); + } + + @Override + protected boolean supportsUnknownFields() { + return false; + } + + @Override + protected ResumeFollowRequest createTestInstance() { + ResumeFollowRequest resumeFollowRequest = new ResumeFollowRequest(randomAlphaOfLength(4)); + if (randomBoolean()) { + resumeFollowRequest.setMaxOutstandingReadRequests(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + resumeFollowRequest.setMaxOutstandingWriteRequests(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + resumeFollowRequest.setMaxReadRequestOperationCount(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + resumeFollowRequest.setMaxReadRequestSize(new ByteSizeValue(randomNonNegativeLong())); + } + if (randomBoolean()) { + resumeFollowRequest.setMaxWriteBufferCount(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + resumeFollowRequest.setMaxWriteBufferSize(new ByteSizeValue(randomNonNegativeLong())); + } + if (randomBoolean()) { + resumeFollowRequest.setMaxWriteRequestOperationCount(randomIntBetween(0, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + resumeFollowRequest.setMaxWriteRequestSize(new ByteSizeValue(randomNonNegativeLong())); + } + if (randomBoolean()) { + resumeFollowRequest.setMaxRetryDelay(new TimeValue(randomNonNegativeLong())); + } + if (randomBoolean()) { + resumeFollowRequest.setReadPollTimeout(new TimeValue(randomNonNegativeLong())); + } + return resumeFollowRequest; + } + +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java index 8df7e40fc9e77..52368793b9e9f 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/CCRDocumentationIT.java @@ -35,6 +35,7 @@ import org.elasticsearch.client.ccr.PauseFollowRequest; import org.elasticsearch.client.ccr.PutFollowRequest; import org.elasticsearch.client.ccr.PutFollowResponse; +import org.elasticsearch.client.ccr.ResumeFollowRequest; import org.elasticsearch.client.core.AcknowledgedResponse; import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; @@ -46,7 +47,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; public class CCRDocumentationIT extends ESRestHighLevelClientTestCase { @@ -198,11 +198,9 @@ public void onFailure(Exception e) { // Resume follow index, so that it can be paused again: { - // TODO: Replace this with high level rest client code when resume follow API is available: - final Request req = new Request("POST", "/" + followIndex + "/_ccr/resume_follow"); - req.setJsonEntity("{}"); - Response res = client().performRequest(req); - assertThat(res.getStatusLine().getStatusCode(), equalTo(200)); + ResumeFollowRequest resumeFollowRequest = new ResumeFollowRequest(followIndex); + AcknowledgedResponse resumeResponse = client.ccr().resumeFollow(resumeFollowRequest, RequestOptions.DEFAULT); + assertThat(resumeResponse.isAcknowledged(), is(true)); } // Replace the empty listener by a blocking listener in test @@ -217,6 +215,79 @@ public void onFailure(Exception e) { assertTrue(latch.await(30L, TimeUnit.SECONDS)); } + public void testResumeFollow() throws Exception { + RestHighLevelClient client = highLevelClient(); + { + // Create leader index: + CreateIndexRequest createIndexRequest = new CreateIndexRequest("leader"); + createIndexRequest.settings(Collections.singletonMap("index.soft_deletes.enabled", true)); + CreateIndexResponse response = client.indices().create(createIndexRequest, RequestOptions.DEFAULT); + assertThat(response.isAcknowledged(), is(true)); + } + String followIndex = "follower"; + // Follow index, so that it can be paused: + { + PutFollowRequest putFollowRequest = new PutFollowRequest("local", "leader", followIndex); + PutFollowResponse putFollowResponse = client.ccr().putFollow(putFollowRequest, RequestOptions.DEFAULT); + assertThat(putFollowResponse.isFollowIndexCreated(), is(true)); + assertThat(putFollowResponse.isFollowIndexShardsAcked(), is(true)); + assertThat(putFollowResponse.isIndexFollowingStarted(), is(true)); + } + + // Pause follow index, so that it can be resumed: + { + PauseFollowRequest pauseFollowRequest = new PauseFollowRequest(followIndex); + AcknowledgedResponse pauseResponse = client.ccr().pauseFollow(pauseFollowRequest, RequestOptions.DEFAULT); + assertThat(pauseResponse.isAcknowledged(), is(true)); + } + + // tag::ccr-resume-follow-request + ResumeFollowRequest request = new ResumeFollowRequest(followIndex); // <1> + // end::ccr-resume-follow-request + + // tag::ccr-resume-follow-execute + AcknowledgedResponse response = + client.ccr().resumeFollow(request, RequestOptions.DEFAULT); + // end::ccr-resume-follow-execute + + // tag::ccr-resume-follow-response + boolean acknowledged = response.isAcknowledged(); // <1> + // end::ccr-resume-follow-response + + // Pause follow index, so that it can be resumed again: + { + PauseFollowRequest pauseFollowRequest = new PauseFollowRequest(followIndex); + AcknowledgedResponse pauseResponse = client.ccr().pauseFollow(pauseFollowRequest, RequestOptions.DEFAULT); + assertThat(pauseResponse.isAcknowledged(), is(true)); + } + + // tag::ccr-resume-follow-execute-listener + ActionListener listener = + new ActionListener() { + @Override + public void onResponse(AcknowledgedResponse response) { + boolean acknowledged = response.isAcknowledged(); // <1> + } + + @Override + public void onFailure(Exception e) { + // <2> + } + }; + // end::ccr-resume-follow-execute-listener + + // Replace the empty listener by a blocking listener in test + final CountDownLatch latch = new CountDownLatch(1); + listener = new LatchedActionListener<>(listener, latch); + + // tag::ccr-resume-follow-execute-async + client.ccr() + .resumeFollowAsync(request, RequestOptions.DEFAULT, listener); // <1> + // end::ccr-resume-follow-execute-async + + assertTrue(latch.await(30L, TimeUnit.SECONDS)); + } + static Map toMap(Response response) throws IOException { return XContentHelper.convertToMap(JsonXContent.jsonXContent, EntityUtils.toString(response.getEntity()), false); } diff --git a/docs/java-rest/high-level/ccr/resume_follow.asciidoc b/docs/java-rest/high-level/ccr/resume_follow.asciidoc new file mode 100644 index 0000000000000..349440dbc9450 --- /dev/null +++ b/docs/java-rest/high-level/ccr/resume_follow.asciidoc @@ -0,0 +1,35 @@ +-- +:api: ccr-resume-follow +:request: ResumeFollowRequest +:response: ResumeFollowResponse +-- + +[id="{upid}-{api}"] +=== Resume Follow API + + +[id="{upid}-{api}-request"] +==== Request + +The Resume Follow API allows you to resume following a follower index that has been paused. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests-file}[{api}-request] +-------------------------------------------------- +<1> The name of follower index. + +[id="{upid}-{api}-response"] +==== Response + +The returned +{response}+ indicates if the resume follow request was received. + +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests-file}[{api}-response] +-------------------------------------------------- +<1> Whether or not the resume follow was acknowledged. + +include::../execution.asciidoc[] + + diff --git a/docs/java-rest/high-level/supported-apis.asciidoc b/docs/java-rest/high-level/supported-apis.asciidoc index 880a8621f0598..e7eb7022090bb 100644 --- a/docs/java-rest/high-level/supported-apis.asciidoc +++ b/docs/java-rest/high-level/supported-apis.asciidoc @@ -431,9 +431,11 @@ The Java High Level REST Client supports the following CCR APIs: * <<{upid}-ccr-put-follow>> * <<{upid}-ccr-pause-follow>> +* <<{upid}-ccr-resume-follow>> include::ccr/put_follow.asciidoc[] include::ccr/pause_follow.asciidoc[] +include::ccr/resume_follow.asciidoc[] == Index Lifecycle Management APIs From 1474d8b7f64d5fc800a3c1389ea0eb90d98fe6df Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 21 Nov 2018 08:06:09 +0100 Subject: [PATCH 2/2] lets support unknown fields in this test parser --- .../elasticsearch/client/ccr/ResumeFollowRequestTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/ResumeFollowRequestTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/ResumeFollowRequestTests.java index f68148c63180c..3f00891331839 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/ResumeFollowRequestTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ccr/ResumeFollowRequestTests.java @@ -31,7 +31,7 @@ public class ResumeFollowRequestTests extends AbstractXContentTestCase { private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>("test_parser", - (args) -> new ResumeFollowRequest((String) args[0])); + true, (args) -> new ResumeFollowRequest((String) args[0])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), PutFollowRequest.FOLLOWER_INDEX_FIELD); @@ -74,7 +74,7 @@ protected ResumeFollowRequest doParseInstance(XContentParser parser) throws IOEx @Override protected boolean supportsUnknownFields() { - return false; + return true; } @Override