From a731f2addc3d8fcac6f2f8c68eecc0f52fe10986 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Sun, 13 Jan 2019 22:50:27 +0100 Subject: [PATCH 1/5] Add ccr follow info api This api returns all follower indices and per follower index the provided parameters at put follow / resume follow time and whether index following is paused or active. Closes #37127 --- docs/reference/ccr/apis/ccr-apis.asciidoc | 2 + .../ccr/apis/follow/get-follow-info.asciidoc | 171 +++++++ .../rest-api-spec/test/ccr/follow_info.yml | 75 ++++ .../java/org/elasticsearch/xpack/ccr/Ccr.java | 5 + .../ccr/action/TransportFollowInfoAction.java | 117 +++++ .../xpack/ccr/rest/RestFollowInfoAction.java | 39 ++ .../elasticsearch/xpack/ccr/AutoFollowIT.java | 50 ++- .../ccr/action/FollowInfoRequestTests.java | 25 ++ .../ccr/action/FollowInfoResponseTests.java | 147 ++++++ .../core/ccr/action/FollowInfoAction.java | 422 ++++++++++++++++++ .../rest-api-spec/api/ccr.follow_info.json | 16 + 11 files changed, 1050 insertions(+), 19 deletions(-) create mode 100644 docs/reference/ccr/apis/follow/get-follow-info.asciidoc create mode 100644 x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_info.yml create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoAction.java create mode 100644 x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowInfoAction.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowInfoRequestTests.java create mode 100644 x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowInfoResponseTests.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowInfoAction.java create mode 100644 x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.follow_info.json diff --git a/docs/reference/ccr/apis/ccr-apis.asciidoc b/docs/reference/ccr/apis/ccr-apis.asciidoc index f0b23c410eae3..2009742c8322b 100644 --- a/docs/reference/ccr/apis/ccr-apis.asciidoc +++ b/docs/reference/ccr/apis/ccr-apis.asciidoc @@ -22,6 +22,7 @@ You can use the following APIs to perform {ccr} operations. * <> * <> * <> +* <> [float] [[ccr-api-auto-follow]] @@ -40,6 +41,7 @@ include::follow/post-pause-follow.asciidoc[] include::follow/post-resume-follow.asciidoc[] include::follow/post-unfollow.asciidoc[] include::follow/get-follow-stats.asciidoc[] +include::follow/get-follow-info.asciidoc[] // auto-follow include::auto-follow/put-auto-follow-pattern.asciidoc[] diff --git a/docs/reference/ccr/apis/follow/get-follow-info.asciidoc b/docs/reference/ccr/apis/follow/get-follow-info.asciidoc new file mode 100644 index 0000000000000..f9be0868f1085 --- /dev/null +++ b/docs/reference/ccr/apis/follow/get-follow-info.asciidoc @@ -0,0 +1,171 @@ +[role="xpack"] +[testenv="platinum"] +[[ccr-get-follow-info]] +=== Get follower info API +++++ +Get follower info +++++ + +beta[] + +Get follower info. + +==== Description + +This API returns information about all follower indices. + +For each follower index that gets returned, the configured parameters +(e.g. follower index name, leader index name, replication options) and +the status (whether follower index is active or paused) is included. + +==== Request + +////////////////////////// + +[source,js] +-------------------------------------------------- +PUT /follower_index/_ccr/follow +{ + "remote_cluster" : "remote_cluster", + "leader_index" : "leader_index" +} +-------------------------------------------------- +// CONSOLE +// TESTSETUP +// TEST[setup:remote_cluster_and_leader_index] + +[source,js] +-------------------------------------------------- +POST /follower_index/_ccr/pause_follow +-------------------------------------------------- +// CONSOLE +// TEARDOWN + +////////////////////////// + +[source,js] +-------------------------------------------------- +GET //_ccr/info +-------------------------------------------------- +// CONSOLE +// TEST[s//follower_index/] + +==== Path Parameters +`index` :: + (string) a comma-delimited list of follower index patterns + +==== Results + +This API returns the following information: + +`follower_indices`:: + (array) an array of follower index statistics + +The `indices` array consists of objects containing two fields: + +`indices[].follower_index`:: + (string) the name of the follower index + +`indices[].remote_cluster`:: + (string) the > containing the leader + index + +`indices[].leader_cluster`:: + (string) the name of the index in the leader cluster being followed + +`indices[].status`:: + (string) whether index following is `active` or `paused` + +`indices[].parameters`:: + (object) an object encapsulating ccr replication related parameters + +The `parameters` contains the following fields: + +`indices[].parameters.max_read_request_operation_count`:: + (integer) the configured maximum number of operations to pull per read from the remote + cluster + +`indices[].parameters.max_outstanding_read_requests`:: + (long) the configured maximum number of outstanding reads requests from the remote + cluster + +`indices[].parameters.max_read_request_size`:: + (<>) the configured maximum size in bytes of per read of a batch + of operations pulled from the remote cluster + +`indices[].parameters.max_write_request_operation_count`:: + (integer) the configured maximum number of operations per bulk write request executed on + the follower + +`indices[].parameters.max_write_request_size`:: + (<>) the configured maximum total bytes of operations per bulk write request + executed on the follower + +`indices[].parameters.max_outstanding_write_requests`:: + (integer) the configured maximum number of outstanding write requests on the follower + +`indices[].parameters.max_write_buffer_count`:: + (integer) th configurede maximum number of operations that can be queued for writing; + when this limit is reached, reads from the remote cluster will be deferred + until the number of queued operations goes below the limit + +`indices[].parameters.max_write_buffer_size`:: + (<>) the configured maximum total bytes of operations that can be queued for + writing; when this limit is reached, reads from the remote cluster will be + deferred until the total bytes of queued operations goes below the limit + +`indices[].parameters.max_retry_delay`:: + (<>) the configured maximum time to wait before retrying an + operation that failed exceptionally; an exponential backoff strategy is + employed when retrying + +`indices[].parameters.read_poll_timeout`:: + (<>) the configured maximum time to wait for new operations on the + remote cluster when the follower index is synchronized with the leader index; + when the timeout has elapsed, the poll for operations will return to the + follower so that it can update some statistics, and then the follower will + immediately attempt to read from the leader again + +==== Authorization + +If the {es} {security-features} are enabled, you must have `monitor` cluster +privileges. For more information, see {stack-ov}/security-privileges.html[Security privileges]. + +==== Example + +This example retrieves follower info: + +[source,js] +-------------------------------------------------- +GET /follower_index/_ccr/info +-------------------------------------------------- +// CONSOLE + +The API returns the following results: + +[source,js] +-------------------------------------------------- +{ + "follower_indices" : [ + { + "follower_index" : "follower_index", + "remote_cluster" : "remote_cluster", + "leader_index" : "leader_index", + "status" : "active", + "parameters" : { + "max_read_request_operation_count" : 5120, + "max_read_request_size" : "32mb", + "max_outstanding_read_requests" : 12, + "max_write_request_operation_count" : 5120, + "max_write_request_size" : "9223372036854775807b", + "max_outstanding_write_requests" : 9, + "max_write_buffer_count" : 2147483647, + "max_write_buffer_size" : "512mb", + "max_retry_delay" : "500ms", + "read_poll_timeout" : "1m" + } + } + ] +} +-------------------------------------------------- +// TESTRESPONSE diff --git a/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_info.yml b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_info.yml new file mode 100644 index 0000000000000..f1e47d830cf97 --- /dev/null +++ b/x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_info.yml @@ -0,0 +1,75 @@ +--- +"Test info": + - do: + cluster.state: {} + + - set: {master_node: master} + + - do: + nodes.info: {} + + - set: {nodes.$master.transport_address: local_ip} + + - do: + cluster.put_settings: + body: + transient: + cluster.remote.local.seeds: $local_ip + flat_settings: true + + - match: {transient: {cluster.remote.local.seeds: $local_ip}} + + - do: + indices.create: + index: foo + body: + settings: + index: + number_of_shards: 1 + number_of_replicas: 0 + soft_deletes: + enabled: true + + - do: + ccr.follow: + index: bar + body: + remote_cluster: local + leader_index: foo + - is_true: follow_index_created + - is_true: follow_index_shards_acked + - is_true: index_following_started + + - do: + ccr.follow_info: + index: bar + - length: {follower_indices: 1} + - match: {follower_indices.0.follower_index: "bar"} + - match: {follower_indices.0.remote_cluster: "local"} + - match: {follower_indices.0.leader_index: "foo"} + - match: {follower_indices.0.status: "active"} + - match: {follower_indices.0.parameters.max_read_request_operation_count: 5120} + - match: {follower_indices.0.parameters.max_read_request_size: "32mb"} + - match: {follower_indices.0.parameters.max_outstanding_read_requests: 12} + - match: {follower_indices.0.parameters.max_write_request_operation_count: 5120} + - match: {follower_indices.0.parameters.max_write_request_size: "9223372036854775807b"} + - match: {follower_indices.0.parameters.max_outstanding_write_requests: 9} + - match: {follower_indices.0.parameters.max_write_buffer_count: 2147483647,} + - match: {follower_indices.0.parameters.max_write_buffer_size: "512mb"} + - match: {follower_indices.0.parameters.max_retry_delay: "500ms"} + - match: {follower_indices.0.parameters.read_poll_timeout: "1m"} + + - do: + ccr.pause_follow: + index: bar + - is_true: acknowledged + + - do: + ccr.follow_info: + index: bar + - length: {follower_indices: 1} + - match: {follower_indices.0.follower_index: "bar"} + - match: {follower_indices.0.remote_cluster: "local"} + - match: {follower_indices.0.leader_index: "foo"} + - match: {follower_indices.0.status: "paused"} + - is_false: follower_indices.0.parameters diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java index b3d2d05048211..2622a25edc81c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java @@ -50,6 +50,7 @@ import org.elasticsearch.xpack.ccr.action.ShardFollowTasksExecutor; import org.elasticsearch.xpack.ccr.action.TransportCcrStatsAction; import org.elasticsearch.xpack.ccr.action.TransportDeleteAutoFollowPatternAction; +import org.elasticsearch.xpack.ccr.action.TransportFollowInfoAction; import org.elasticsearch.xpack.ccr.action.TransportFollowStatsAction; import org.elasticsearch.xpack.ccr.action.TransportGetAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.action.TransportPauseFollowAction; @@ -68,6 +69,7 @@ import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService; import org.elasticsearch.xpack.ccr.rest.RestCcrStatsAction; import org.elasticsearch.xpack.ccr.rest.RestDeleteAutoFollowPatternAction; +import org.elasticsearch.xpack.ccr.rest.RestFollowInfoAction; import org.elasticsearch.xpack.ccr.rest.RestFollowStatsAction; import org.elasticsearch.xpack.ccr.rest.RestGetAutoFollowPatternAction; import org.elasticsearch.xpack.ccr.rest.RestPauseFollowAction; @@ -79,6 +81,7 @@ import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; +import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction; import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction; import org.elasticsearch.xpack.core.ccr.action.GetAutoFollowPatternAction; import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction; @@ -196,6 +199,7 @@ public List> getPersistentTasksExecutor(ClusterServic // stats action new ActionHandler<>(FollowStatsAction.INSTANCE, TransportFollowStatsAction.class), new ActionHandler<>(CcrStatsAction.INSTANCE, TransportCcrStatsAction.class), + new ActionHandler<>(FollowInfoAction.INSTANCE, TransportFollowInfoAction.class), // follow actions new ActionHandler<>(PutFollowAction.INSTANCE, TransportPutFollowAction.class), new ActionHandler<>(ResumeFollowAction.INSTANCE, TransportResumeFollowAction.class), @@ -219,6 +223,7 @@ public List getRestHandlers(Settings settings, RestController restC // stats API new RestFollowStatsAction(settings, restController), new RestCcrStatsAction(settings, restController), + new RestFollowInfoAction(settings, restController), // follow APIs new RestPutFollowAction(settings, restController), new RestResumeFollowAction(settings, restController), diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoAction.java new file mode 100644 index 0000000000000..3e9c0ecbef881 --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoAction.java @@ -0,0 +1,117 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.action.support.master.TransportMasterNodeReadAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; +import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.persistent.PersistentTasksCustomMetaData; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.ccr.Ccr; +import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction; +import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowParameters; +import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo; +import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class TransportFollowInfoAction extends TransportMasterNodeReadAction { + + @Inject + public TransportFollowInfoAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + super(FollowInfoAction.NAME, transportService, clusterService, threadPool, actionFilters, FollowInfoAction.Request::new, + indexNameExpressionResolver); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected FollowInfoAction.Response newResponse() { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + protected FollowInfoAction.Response read(StreamInput in) throws IOException { + return new FollowInfoAction.Response(in); + } + + @Override + protected void masterOperation(FollowInfoAction.Request request, + ClusterState state, + ActionListener listener) throws Exception { + + List concreteFollowerIndices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(state, + IndicesOptions.STRICT_EXPAND_OPEN_CLOSED, request.getFollowerIndices())); + + + List followerInfos = new ArrayList<>(); + PersistentTasksCustomMetaData persistentTasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE); + + for (IndexMetaData indexMetaData : state.metaData()) { + Map ccrCustomData = indexMetaData.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY); + if (ccrCustomData != null) { + Optional result; + if (persistentTasks != null) { + result = persistentTasks.taskMap().values().stream() + .map(persistentTask -> (ShardFollowTask) persistentTask.getParams()) + .filter(shardFollowTask -> concreteFollowerIndices.isEmpty() || + concreteFollowerIndices.contains(shardFollowTask.getFollowShardId().getIndexName())) + .findAny(); + } else { + result = Optional.empty(); + } + + String followerIndex = indexMetaData.getIndex().getName(); + String remoteCluster = ccrCustomData.get(Ccr.CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY); + String leaderIndex = ccrCustomData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY); + if (result.isPresent()) { + ShardFollowTask params = result.get(); + FollowParameters followParameters = new FollowParameters( + params.getMaxReadRequestOperationCount(), + params.getMaxReadRequestSize(), + params.getMaxOutstandingReadRequests(), + params.getMaxWriteRequestOperationCount(), + params.getMaxWriteRequestSize(), + params.getMaxOutstandingWriteRequests(), + params.getMaxWriteBufferCount(), + params.getMaxWriteBufferSize(), + params.getMaxRetryDelay(), + params.getReadPollTimeout() + ); + followerInfos.add(new FollowerInfo(followerIndex, remoteCluster, leaderIndex, Status.ACTIVE, followParameters)); + } else { + followerInfos.add(new FollowerInfo(followerIndex, remoteCluster, leaderIndex, Status.PAUSED, null)); + } + } + } + + listener.onResponse(new FollowInfoAction.Response(followerInfos)); + } + + @Override + protected ClusterBlockException checkBlock(FollowInfoAction.Request request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ); + } +} diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowInfoAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowInfoAction.java new file mode 100644 index 0000000000000..f2e256bf5f88c --- /dev/null +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/rest/RestFollowInfoAction.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ccr.rest; + +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction; + +import java.io.IOException; + +public class RestFollowInfoAction extends BaseRestHandler { + + public RestFollowInfoAction(final Settings settings, final RestController controller) { + super(settings); + controller.registerHandler(RestRequest.Method.GET, "/{index}/_ccr/info", this); + } + + @Override + public String getName() { + return "ccr_follower_info"; + } + + @Override + protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) throws IOException { + final FollowInfoAction.Request request = new FollowInfoAction.Request(); + request.setFollowerIndices(Strings.splitStringByCommaToArray(restRequest.param("index"))); + return channel -> client.execute(FollowInfoAction.INSTANCE, request, new RestToXContentListener<>(channel)); + } + +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java index 286e5badee133..a952bbc04a007 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/AutoFollowIT.java @@ -16,14 +16,16 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.CcrIntegTestCase; -import org.elasticsearch.xpack.ccr.action.ShardFollowTask; import org.elasticsearch.xpack.core.ccr.AutoFollowMetadata; import org.elasticsearch.xpack.core.ccr.AutoFollowStats; import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction; import org.elasticsearch.xpack.core.ccr.action.DeleteAutoFollowPatternAction; +import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction; +import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowParameters; +import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo; import org.elasticsearch.xpack.core.ccr.action.PutAutoFollowPatternAction; import java.util.Arrays; @@ -218,42 +220,52 @@ public void testAutoFollowParameterAreDelegated() throws Exception { createLeaderIndex("logs-201901", leaderIndexSettings); assertBusy(() -> { - PersistentTasksCustomMetaData persistentTasksMetaData = - followerClient().admin().cluster().prepareState().get().getState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE); - assertThat(persistentTasksMetaData, notNullValue()); - assertThat(persistentTasksMetaData.tasks().size(), equalTo(1)); - ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTasksMetaData.tasks().iterator().next().getParams(); - assertThat(shardFollowTask.getLeaderShardId().getIndexName(), equalTo("logs-201901")); - assertThat(shardFollowTask.getFollowShardId().getIndexName(), equalTo("copy-logs-201901")); + FollowInfoAction.Request followInfoRequest = new FollowInfoAction.Request(); + followInfoRequest.setFollowerIndices("copy-logs-201901"); + FollowInfoAction.Response followInfoResponse; + try { + followInfoResponse = followerClient().execute(FollowInfoAction.INSTANCE, followInfoRequest).actionGet(); + } catch (IndexNotFoundException e) { + throw new AssertionError(e); + } + + assertThat(followInfoResponse.getFollowInfos().size(), equalTo(1)); + FollowerInfo followerInfo = followInfoResponse.getFollowInfos().get(0); + assertThat(followerInfo.getFollowerIndex(), equalTo("copy-logs-201901")); + assertThat(followerInfo.getRemoteCluster(), equalTo("leader_cluster")); + assertThat(followerInfo.getLeaderIndex(), equalTo("logs-201901")); + + FollowParameters followParameters = followerInfo.getParameters(); + assertThat(followParameters, notNullValue()); if (request.getMaxWriteBufferCount() != null) { - assertThat(shardFollowTask.getMaxWriteBufferCount(), equalTo(request.getMaxWriteBufferCount())); + assertThat(followParameters.getMaxWriteBufferCount(), equalTo(request.getMaxWriteBufferCount())); } if (request.getMaxWriteBufferSize() != null) { - assertThat(shardFollowTask.getMaxWriteBufferSize(), equalTo(request.getMaxWriteBufferSize())); + assertThat(followParameters.getMaxWriteBufferSize(), equalTo(request.getMaxWriteBufferSize())); } if (request.getMaxConcurrentReadBatches() != null) { - assertThat(shardFollowTask.getMaxOutstandingReadRequests(), equalTo(request.getMaxConcurrentReadBatches())); + assertThat(followParameters.getMaxOutstandingReadRequests(), equalTo(request.getMaxConcurrentReadBatches())); } if (request.getMaxConcurrentWriteBatches() != null) { - assertThat(shardFollowTask.getMaxOutstandingWriteRequests(), equalTo(request.getMaxConcurrentWriteBatches())); + assertThat(followParameters.getMaxOutstandingWriteRequests(), equalTo(request.getMaxConcurrentWriteBatches())); } if (request.getMaxReadRequestOperationCount() != null) { - assertThat(shardFollowTask.getMaxReadRequestOperationCount(), equalTo(request.getMaxReadRequestOperationCount())); + assertThat(followParameters.getMaxReadRequestOperationCount(), equalTo(request.getMaxReadRequestOperationCount())); } if (request.getMaxReadRequestSize() != null) { - assertThat(shardFollowTask.getMaxReadRequestSize(), equalTo(request.getMaxReadRequestSize())); + assertThat(followParameters.getMaxReadRequestSize(), equalTo(request.getMaxReadRequestSize())); } if (request.getMaxRetryDelay() != null) { - assertThat(shardFollowTask.getMaxRetryDelay(), equalTo(request.getMaxRetryDelay())); + assertThat(followParameters.getMaxRetryDelay(), equalTo(request.getMaxRetryDelay())); } if (request.getReadPollTimeout() != null) { - assertThat(shardFollowTask.getReadPollTimeout(), equalTo(request.getReadPollTimeout())); + assertThat(followParameters.getReadPollTimeout(), equalTo(request.getReadPollTimeout())); } if (request.getMaxWriteRequestOperationCount() != null) { - assertThat(shardFollowTask.getMaxWriteRequestOperationCount(), equalTo(request.getMaxWriteRequestOperationCount())); + assertThat(followParameters.getMaxWriteRequestOperationCount(), equalTo(request.getMaxWriteRequestOperationCount())); } if (request.getMaxWriteRequestSize() != null) { - assertThat(shardFollowTask.getMaxWriteRequestSize(), equalTo(request.getMaxWriteRequestSize())); + assertThat(followParameters.getMaxWriteRequestSize(), equalTo(request.getMaxWriteRequestSize())); } }); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowInfoRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowInfoRequestTests.java new file mode 100644 index 0000000000000..d235d956e01d5 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowInfoRequestTests.java @@ -0,0 +1,25 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction; + +public class FollowInfoRequestTests extends AbstractWireSerializingTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return FollowInfoAction.Request::new; + } + + @Override + protected FollowInfoAction.Request createTestInstance() { + FollowInfoAction.Request request = new FollowInfoAction.Request(); + request.setFollowerIndices(generateRandomStringArray(4, 4, true, false)); + return request; + } +} diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowInfoResponseTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowInfoResponseTests.java new file mode 100644 index 0000000000000..d21098506a121 --- /dev/null +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowInfoResponseTests.java @@ -0,0 +1,147 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ccr.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.AbstractSerializingTestCase; +import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction; +import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FOLLOWER_INDICES_FIELD; +import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowParameters; +import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status; + +public class FollowInfoResponseTests extends AbstractSerializingTestCase { + + static final ConstructingObjectParser PARAMETERS_PARSER = new ConstructingObjectParser<>( + "parameters_parser", + args -> { + return new FollowParameters( + (Integer) args[0], + (ByteSizeValue) args[1], + (Integer) args[2], + (Integer) args[3], + (ByteSizeValue) args[4], + (Integer) args[5], + (Integer) args[6], + (ByteSizeValue) args[7], + (TimeValue) args[8], + (TimeValue) args[9] + ); + }); + + static { + PARAMETERS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTask.MAX_READ_REQUEST_OPERATION_COUNT); + PARAMETERS_PARSER.declareField( + ConstructingObjectParser.constructorArg(), + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), ShardFollowTask.MAX_READ_REQUEST_SIZE.getPreferredName()), + ShardFollowTask.MAX_READ_REQUEST_SIZE, + ObjectParser.ValueType.STRING); + PARAMETERS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTask.MAX_OUTSTANDING_READ_REQUESTS); + PARAMETERS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTask.MAX_WRITE_REQUEST_OPERATION_COUNT); + PARAMETERS_PARSER.declareField( + ConstructingObjectParser.constructorArg(), + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), ShardFollowTask.MAX_WRITE_REQUEST_SIZE.getPreferredName()), + ShardFollowTask.MAX_WRITE_REQUEST_SIZE, + ObjectParser.ValueType.STRING); + PARAMETERS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTask.MAX_OUTSTANDING_WRITE_REQUESTS); + PARAMETERS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), ShardFollowTask.MAX_WRITE_BUFFER_COUNT); + PARAMETERS_PARSER.declareField( + ConstructingObjectParser.constructorArg(), + (p, c) -> ByteSizeValue.parseBytesSizeValue(p.text(), ShardFollowTask.MAX_WRITE_BUFFER_SIZE.getPreferredName()), + ShardFollowTask.MAX_WRITE_BUFFER_SIZE, + ObjectParser.ValueType.STRING); + PARAMETERS_PARSER.declareField( + ConstructingObjectParser.constructorArg(), + (p, c) -> TimeValue.parseTimeValue(p.text(), ShardFollowTask.MAX_RETRY_DELAY.getPreferredName()), + ShardFollowTask.MAX_RETRY_DELAY, + ObjectParser.ValueType.STRING); + PARAMETERS_PARSER.declareField( + ConstructingObjectParser.constructorArg(), + (p, c) -> TimeValue.parseTimeValue(p.text(), ShardFollowTask.READ_POLL_TIMEOUT.getPreferredName()), + ShardFollowTask.READ_POLL_TIMEOUT, + ObjectParser.ValueType.STRING); + } + + static final ConstructingObjectParser INFO_PARSER = new ConstructingObjectParser<>( + "info_parser", + args -> { + return new FollowerInfo( + (String) args[0], + (String) args[1], + (String) args[2], + Status.fromString((String) args[3]), + (FollowParameters) args[4] + ); + }); + + static { + INFO_PARSER.declareString(ConstructingObjectParser.constructorArg(), FollowerInfo.FOLLOWER_INDEX_FIELD); + INFO_PARSER.declareString(ConstructingObjectParser.constructorArg(), FollowerInfo.REMOTE_CLUSTER_FIELD); + INFO_PARSER.declareString(ConstructingObjectParser.constructorArg(), FollowerInfo.LEADER_INDEX_FIELD); + INFO_PARSER.declareString(ConstructingObjectParser.constructorArg(), FollowerInfo.STATUS_FIELD); + INFO_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), PARAMETERS_PARSER, FollowerInfo.PARAMETERS_FIELD); + } + + @SuppressWarnings("unchecked") + static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "response", + args -> { + return new FollowInfoAction.Response( + (List) args[0] + ); + }); + + static { + PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), INFO_PARSER, FOLLOWER_INDICES_FIELD); + } + + @Override + protected FollowInfoAction.Response doParseInstance(XContentParser parser) throws IOException { + return PARSER.apply(parser, null); + } + + @Override + protected Writeable.Reader instanceReader() { + return FollowInfoAction.Response::new; + } + + @Override + protected FollowInfoAction.Response createTestInstance() { + int numInfos = randomIntBetween(0, 32); + List infos = new ArrayList<>(numInfos); + for (int i = 0; i < numInfos; i++) { + FollowParameters followParameters = null; + if (randomBoolean()) { + followParameters = new FollowParameters( + randomIntBetween(0, Integer.MAX_VALUE), + new ByteSizeValue(randomNonNegativeLong()), + randomIntBetween(0, Integer.MAX_VALUE), + randomIntBetween(0, Integer.MAX_VALUE), + new ByteSizeValue(randomNonNegativeLong()), + randomIntBetween(0, Integer.MAX_VALUE), + randomIntBetween(0, Integer.MAX_VALUE), + new ByteSizeValue(randomNonNegativeLong()), + new TimeValue(randomNonNegativeLong()), + new TimeValue(randomNonNegativeLong()) + ); + } + + infos.add(new FollowerInfo(randomAlphaOfLength(4), randomAlphaOfLength(4), randomAlphaOfLength(4), + randomFrom(Status.values()), followParameters)); + } + return new FollowInfoAction.Response(infos); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowInfoAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowInfoAction.java new file mode 100644 index 0000000000000..11d4f22e1b7a8 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowInfoAction.java @@ -0,0 +1,422 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ccr.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.master.MasterNodeReadRequest; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_OUTSTANDING_READ_REQUESTS; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_OUTSTANDING_WRITE_REQUESTS; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_READ_REQUEST_OPERATION_COUNT; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_READ_REQUEST_SIZE; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_RETRY_DELAY_FIELD; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_COUNT; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_BUFFER_SIZE; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_REQUEST_OPERATION_COUNT; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.MAX_WRITE_REQUEST_SIZE; +import static org.elasticsearch.xpack.core.ccr.action.ResumeFollowAction.Request.READ_POLL_TIMEOUT; + +public class FollowInfoAction extends Action { + + public static final String NAME = "cluster:monitor/ccr/follow_info"; + + public static final FollowInfoAction INSTANCE = new FollowInfoAction(); + + private FollowInfoAction() { + super(NAME); + } + + @Override + public Response newResponse() { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return Response::new; + } + + public static class Request extends MasterNodeReadRequest { + + private String[] followerIndices; + + public Request() { + } + + public String[] getFollowerIndices() { + return followerIndices; + } + + public void setFollowerIndices(String... followerIndices) { + this.followerIndices = followerIndices; + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public Request(StreamInput in) throws IOException { + super(in); + followerIndices = in.readOptionalStringArray(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalStringArray(followerIndices); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Arrays.equals(followerIndices, request.followerIndices); + } + + @Override + public int hashCode() { + return Arrays.hashCode(followerIndices); + } + } + + public static class Response extends ActionResponse implements ToXContentObject { + + public static final ParseField FOLLOWER_INDICES_FIELD = new ParseField("follower_indices"); + + private final List followInfos; + + public Response(List followInfos) { + this.followInfos = followInfos; + } + + public List getFollowInfos() { + return followInfos; + } + + public Response(StreamInput in) throws IOException { + super(in); + followInfos = in.readList(FollowerInfo::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeList(followInfos); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startArray(FOLLOWER_INDICES_FIELD.getPreferredName()); + for (FollowerInfo followInfo : followInfos) { + followInfo.toXContent(builder, params); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return Objects.equals(followInfos, response.followInfos); + } + + @Override + public int hashCode() { + return Objects.hash(followInfos); + } + + public String toString() { + return Strings.toString(this); + } + + public static class FollowerInfo implements Writeable, ToXContentObject { + + public static final ParseField FOLLOWER_INDEX_FIELD = new ParseField("follower_index"); + public static final ParseField REMOTE_CLUSTER_FIELD = new ParseField("remote_cluster"); + public static final ParseField LEADER_INDEX_FIELD = new ParseField("leader_index"); + public static final ParseField STATUS_FIELD = new ParseField("status"); + public static final ParseField PARAMETERS_FIELD = new ParseField("parameters"); + + private final String followerIndex; + private final String remoteCluster; + private final String leaderIndex; + private final Status status; + private final FollowParameters parameters; + + public FollowerInfo(String followerIndex, String remoteCluster, String leaderIndex, Status status, + FollowParameters parameters) { + this.followerIndex = followerIndex; + this.remoteCluster = remoteCluster; + this.leaderIndex = leaderIndex; + this.status = status; + this.parameters = parameters; + } + + public String getFollowerIndex() { + return followerIndex; + } + + public String getRemoteCluster() { + return remoteCluster; + } + + public String getLeaderIndex() { + return leaderIndex; + } + + public Status getStatus() { + return status; + } + + public FollowParameters getParameters() { + return parameters; + } + + FollowerInfo(StreamInput in) throws IOException { + followerIndex = in.readString(); + remoteCluster = in.readString(); + leaderIndex = in.readString(); + status = Status.fromString(in.readString()); + parameters = in.readOptionalWriteable(FollowParameters::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(followerIndex); + out.writeString(remoteCluster); + out.writeString(leaderIndex); + out.writeString(status.name); + out.writeOptionalWriteable(parameters); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex); + builder.field(REMOTE_CLUSTER_FIELD.getPreferredName(), remoteCluster); + builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex); + builder.field(STATUS_FIELD.getPreferredName(), status.name); + if (parameters != null) { + builder.startObject(PARAMETERS_FIELD.getPreferredName()); + { + builder.field(MAX_READ_REQUEST_OPERATION_COUNT.getPreferredName(), parameters.maxReadRequestOperationCount); + builder.field(MAX_READ_REQUEST_SIZE.getPreferredName(), parameters.maxReadRequestSize.getStringRep()); + builder.field(MAX_OUTSTANDING_READ_REQUESTS.getPreferredName(), parameters.maxOutstandingReadRequests); + builder.field(MAX_WRITE_REQUEST_OPERATION_COUNT.getPreferredName(), parameters.maxWriteRequestOperationCount); + builder.field(MAX_WRITE_REQUEST_SIZE.getPreferredName(), parameters.maxWriteRequestSize.getStringRep()); + builder.field(MAX_OUTSTANDING_WRITE_REQUESTS.getPreferredName(), parameters.maxOutstandingWriteRequests); + builder.field(MAX_WRITE_BUFFER_COUNT.getPreferredName(), parameters.maxWriteBufferCount); + builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), parameters.maxWriteBufferSize.getStringRep()); + builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), parameters.maxRetryDelay.getStringRep()); + builder.field(READ_POLL_TIMEOUT.getPreferredName(), parameters.readPollTimeout.getStringRep()); + } + builder.endObject(); + } + builder.endObject(); + return builder; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FollowerInfo that = (FollowerInfo) o; + return Objects.equals(followerIndex, that.followerIndex) && + Objects.equals(remoteCluster, that.remoteCluster) && + Objects.equals(leaderIndex, that.leaderIndex) && + status == that.status && + Objects.equals(parameters, that.parameters); + } + + @Override + public int hashCode() { + return Objects.hash(followerIndex, remoteCluster, leaderIndex, status, parameters); + } + + public String toString() { + return Strings.toString(this); + } + } + + public static class FollowParameters implements Writeable { + + private final int maxReadRequestOperationCount; + private final ByteSizeValue maxReadRequestSize; + private final int maxOutstandingReadRequests; + private final int maxWriteRequestOperationCount; + private final ByteSizeValue maxWriteRequestSize; + private final int maxOutstandingWriteRequests; + private final int maxWriteBufferCount; + private final ByteSizeValue maxWriteBufferSize; + private final TimeValue maxRetryDelay; + private final TimeValue readPollTimeout; + + public FollowParameters(int maxReadRequestOperationCount, + ByteSizeValue maxReadRequestSize, int maxOutstandingReadRequests, + int maxWriteRequestOperationCount, ByteSizeValue maxWriteRequestSize, + int maxOutstandingWriteRequests, int maxWriteBufferCount, + ByteSizeValue maxWriteBufferSize, TimeValue maxRetryDelay, TimeValue readPollTimeout) { + this.maxReadRequestOperationCount = maxReadRequestOperationCount; + this.maxReadRequestSize = maxReadRequestSize; + this.maxOutstandingReadRequests = maxOutstandingReadRequests; + this.maxWriteRequestOperationCount = maxWriteRequestOperationCount; + this.maxWriteRequestSize = maxWriteRequestSize; + this.maxOutstandingWriteRequests = maxOutstandingWriteRequests; + this.maxWriteBufferCount = maxWriteBufferCount; + this.maxWriteBufferSize = maxWriteBufferSize; + this.maxRetryDelay = maxRetryDelay; + this.readPollTimeout = readPollTimeout; + } + + public int getMaxReadRequestOperationCount() { + return maxReadRequestOperationCount; + } + + public ByteSizeValue getMaxReadRequestSize() { + return maxReadRequestSize; + } + + public int getMaxOutstandingReadRequests() { + return maxOutstandingReadRequests; + } + + public int getMaxWriteRequestOperationCount() { + return maxWriteRequestOperationCount; + } + + public ByteSizeValue getMaxWriteRequestSize() { + return maxWriteRequestSize; + } + + public int getMaxOutstandingWriteRequests() { + return maxOutstandingWriteRequests; + } + + public int getMaxWriteBufferCount() { + return maxWriteBufferCount; + } + + public ByteSizeValue getMaxWriteBufferSize() { + return maxWriteBufferSize; + } + + public TimeValue getMaxRetryDelay() { + return maxRetryDelay; + } + + public TimeValue getReadPollTimeout() { + return readPollTimeout; + } + + FollowParameters(StreamInput in) throws IOException { + this.maxReadRequestOperationCount = in.readVInt(); + this.maxReadRequestSize = new ByteSizeValue(in); + this.maxOutstandingReadRequests = in.readVInt(); + this.maxWriteRequestOperationCount = in.readVInt(); + this.maxWriteRequestSize = new ByteSizeValue(in); + this.maxOutstandingWriteRequests = in.readVInt(); + this.maxWriteBufferCount = in.readVInt(); + this.maxWriteBufferSize = new ByteSizeValue(in); + this.maxRetryDelay = in.readTimeValue(); + this.readPollTimeout = in.readTimeValue(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(maxReadRequestOperationCount); + maxReadRequestSize.writeTo(out); + out.writeVInt(maxOutstandingReadRequests); + out.writeVLong(maxWriteRequestOperationCount); + maxWriteRequestSize.writeTo(out); + out.writeVInt(maxOutstandingWriteRequests); + out.writeVInt(maxWriteBufferCount); + maxWriteBufferSize.writeTo(out); + out.writeTimeValue(maxRetryDelay); + out.writeTimeValue(readPollTimeout); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FollowParameters that = (FollowParameters) o; + return maxReadRequestOperationCount == that.maxReadRequestOperationCount && + maxOutstandingReadRequests == that.maxOutstandingReadRequests && + maxWriteRequestOperationCount == that.maxWriteRequestOperationCount && + maxOutstandingWriteRequests == that.maxOutstandingWriteRequests && + maxWriteBufferCount == that.maxWriteBufferCount && + Objects.equals(maxReadRequestSize, that.maxReadRequestSize) && + Objects.equals(maxWriteRequestSize, that.maxWriteRequestSize) && + Objects.equals(maxWriteBufferSize, that.maxWriteBufferSize) && + Objects.equals(maxRetryDelay, that.maxRetryDelay) && + Objects.equals(readPollTimeout, that.readPollTimeout); + } + + @Override + public int hashCode() { + return Objects.hash( + maxReadRequestOperationCount, + maxReadRequestSize, + maxOutstandingReadRequests, + maxWriteRequestOperationCount, + maxWriteRequestSize, + maxOutstandingWriteRequests, + maxWriteBufferCount, + maxWriteBufferSize, + maxRetryDelay, + readPollTimeout + ); + } + + } + + public enum Status { + + ACTIVE("active"), + PAUSED("paused"); + + private final String name; + + Status(String name) { + this.name = name; + } + + public static Status fromString(String value) { + switch (value) { + case "active": + return Status.ACTIVE; + case "paused": + return Status.PAUSED; + default: + throw new IllegalArgumentException("unexpected status value [" + value + "]"); + } + } + } + } + +} diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.follow_info.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.follow_info.json new file mode 100644 index 0000000000000..87fd387edc13a --- /dev/null +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/ccr.follow_info.json @@ -0,0 +1,16 @@ +{ + "ccr.follow_info": { + "documentation": "https://www.elastic.co/guide/en/elasticsearch/reference/current/ccr-get-follow-info.html", + "methods": [ "GET" ], + "url": { + "path": "/{index}/_ccr/info", + "paths": [ "/{index}/_ccr/info" ], + "parts": { + "index": { + "type": "list", + "description": "A comma-separated list of index patterns; use `_all` to perform the operation on all indices" + } + } + } + } +} From 4d4f4cbb8122b5f894186c9cc84babfae2bd6327 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Thu, 17 Jan 2019 09:30:15 +0100 Subject: [PATCH 2/5] iter --- docs/reference/ccr/apis/follow/get-follow-info.asciidoc | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/reference/ccr/apis/follow/get-follow-info.asciidoc b/docs/reference/ccr/apis/follow/get-follow-info.asciidoc index f9be0868f1085..8125ed94e3e21 100644 --- a/docs/reference/ccr/apis/follow/get-follow-info.asciidoc +++ b/docs/reference/ccr/apis/follow/get-follow-info.asciidoc @@ -61,7 +61,7 @@ This API returns the following information: `follower_indices`:: (array) an array of follower index statistics -The `indices` array consists of objects containing two fields: +The `indices` array consists of objects containing several fields: `indices[].follower_index`:: (string) the name of the follower index @@ -70,7 +70,7 @@ The `indices` array consists of objects containing two fields: (string) the > containing the leader index -`indices[].leader_cluster`:: +`indices[].leader_index`:: (string) the name of the index in the leader cluster being followed `indices[].status`:: @@ -86,7 +86,7 @@ The `parameters` contains the following fields: cluster `indices[].parameters.max_outstanding_read_requests`:: - (long) the configured maximum number of outstanding reads requests from the remote + (long) the configured maximum number of outstanding read requests from the remote cluster `indices[].parameters.max_read_request_size`:: @@ -105,7 +105,7 @@ The `parameters` contains the following fields: (integer) the configured maximum number of outstanding write requests on the follower `indices[].parameters.max_write_buffer_count`:: - (integer) th configurede maximum number of operations that can be queued for writing; + (integer) the configured maximum number of operations that can be queued for writing; when this limit is reached, reads from the remote cluster will be deferred until the number of queued operations goes below the limit From 1b731592c3e511f9bdc06499b490058391fa7813 Mon Sep 17 00:00:00 2001 From: lcawl Date: Thu, 17 Jan 2019 11:50:12 -0800 Subject: [PATCH 3/5] [DOCS] Edits the get follower info API --- .../ccr/apis/follow/get-follow-info.asciidoc | 78 +++++++++---------- 1 file changed, 38 insertions(+), 40 deletions(-) diff --git a/docs/reference/ccr/apis/follow/get-follow-info.asciidoc b/docs/reference/ccr/apis/follow/get-follow-info.asciidoc index 8125ed94e3e21..e4bb9603867cf 100644 --- a/docs/reference/ccr/apis/follow/get-follow-info.asciidoc +++ b/docs/reference/ccr/apis/follow/get-follow-info.asciidoc @@ -8,15 +8,13 @@ beta[] -Get follower info. +Retrieves information about all follower indices. ==== Description -This API returns information about all follower indices. - -For each follower index that gets returned, the configured parameters -(e.g. follower index name, leader index name, replication options) and -the status (whether follower index is active or paused) is included. +This API lists the configured parameters and the status for each follower index. +For example, the results include follower index names, leader index names, +replication options and whether the follower indices are active or paused. ==== Request @@ -52,84 +50,84 @@ GET //_ccr/info ==== Path Parameters `index` :: - (string) a comma-delimited list of follower index patterns + (string) A comma-delimited list of follower index patterns ==== Results This API returns the following information: `follower_indices`:: - (array) an array of follower index statistics + (array) An array of follower index statistics The `indices` array consists of objects containing several fields: `indices[].follower_index`:: - (string) the name of the follower index + (string) The name of the follower index `indices[].remote_cluster`:: - (string) the > containing the leader - index + (string) The > that contains the + leader index `indices[].leader_index`:: - (string) the name of the index in the leader cluster being followed + (string) The name of the index in the leader cluster that is followed `indices[].status`:: - (string) whether index following is `active` or `paused` + (string) Whether index following is `active` or `paused` `indices[].parameters`:: - (object) an object encapsulating ccr replication related parameters + (object) An object that encapsulates {ccr} parameters The `parameters` contains the following fields: `indices[].parameters.max_read_request_operation_count`:: - (integer) the configured maximum number of operations to pull per read from the remote - cluster + (integer) The configured maximum number of operations to pull per read from + the remote cluster `indices[].parameters.max_outstanding_read_requests`:: - (long) the configured maximum number of outstanding read requests from the remote - cluster + (long) The configured maximum number of outstanding read requests from the + remote cluster `indices[].parameters.max_read_request_size`:: - (<>) the configured maximum size in bytes of per read of a batch - of operations pulled from the remote cluster + (<>) The configured maximum size in bytes of per read + of a batch of operations pulled from the remote cluster `indices[].parameters.max_write_request_operation_count`:: - (integer) the configured maximum number of operations per bulk write request executed on - the follower + (integer) The configured maximum number of operations per bulk write request executed on the follower `indices[].parameters.max_write_request_size`:: - (<>) the configured maximum total bytes of operations per bulk write request - executed on the follower + (<>) The configured maximum total bytes of operations + per bulk write request executed on the follower `indices[].parameters.max_outstanding_write_requests`:: - (integer) the configured maximum number of outstanding write requests on the follower + (integer) The configured maximum number of outstanding write requests on the follower `indices[].parameters.max_write_buffer_count`:: - (integer) the configured maximum number of operations that can be queued for writing; - when this limit is reached, reads from the remote cluster will be deferred - until the number of queued operations goes below the limit + (integer) The configured maximum number of operations that can be queued for writing. When this limit is reached, reads from the remote cluster are + deferred until the number of queued operations goes below the limit `indices[].parameters.max_write_buffer_size`:: - (<>) the configured maximum total bytes of operations that can be queued for - writing; when this limit is reached, reads from the remote cluster will be - deferred until the total bytes of queued operations goes below the limit + (<>) The configured maximum total bytes of operations + that can be queued for writing. When this limit is reached, reads from the + remote cluster are deferred until the total bytes of queued operations goes + below the limit `indices[].parameters.max_retry_delay`:: - (<>) the configured maximum time to wait before retrying an - operation that failed exceptionally; an exponential backoff strategy is - employed when retrying + (<>) The configured maximum time to wait before + retrying an operation that failed exceptionally. An exponential backoff + strategy is employed when retrying `indices[].parameters.read_poll_timeout`:: - (<>) the configured maximum time to wait for new operations on the - remote cluster when the follower index is synchronized with the leader index; - when the timeout has elapsed, the poll for operations will return to the - follower so that it can update some statistics, and then the follower will - immediately attempt to read from the leader again + (<>) The configured maximum time to wait for new + operations on the remote cluster when the follower index is synchronized with + the leader index. When the timeout has elapsed, the poll for operations + returns to the follower so that it can update some statistics, then the + follower immediately attempts to read from the leader again ==== Authorization If the {es} {security-features} are enabled, you must have `monitor` cluster -privileges. For more information, see {stack-ov}/security-privileges.html[Security privileges]. +privileges. For more information, see +{stack-ov}/security-privileges.html[Security privileges]. ==== Example From 8667efb879080849392bf38e176b7951af98dbca Mon Sep 17 00:00:00 2001 From: lcawl Date: Thu, 17 Jan 2019 11:57:10 -0800 Subject: [PATCH 4/5] [DOCS] Fixes link to remote cluster --- docs/reference/ccr/apis/follow/get-follow-info.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/reference/ccr/apis/follow/get-follow-info.asciidoc b/docs/reference/ccr/apis/follow/get-follow-info.asciidoc index e4bb9603867cf..5f6e420bca359 100644 --- a/docs/reference/ccr/apis/follow/get-follow-info.asciidoc +++ b/docs/reference/ccr/apis/follow/get-follow-info.asciidoc @@ -65,7 +65,7 @@ The `indices` array consists of objects containing several fields: (string) The name of the follower index `indices[].remote_cluster`:: - (string) The > that contains the + (string) The <> that contains the leader index `indices[].leader_index`:: From da5c396a52f6d9dd3e6ce5bbd9c91519789beadd Mon Sep 17 00:00:00 2001 From: lcawl Date: Thu, 17 Jan 2019 17:26:01 -0800 Subject: [PATCH 5/5] [DOCS] Clarifies descriptions for configured parameters --- .../ccr/apis/follow/get-follow-info.asciidoc | 50 +++++++++---------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/docs/reference/ccr/apis/follow/get-follow-info.asciidoc b/docs/reference/ccr/apis/follow/get-follow-info.asciidoc index 5f6e420bca359..22418db10887c 100644 --- a/docs/reference/ccr/apis/follow/get-follow-info.asciidoc +++ b/docs/reference/ccr/apis/follow/get-follow-info.asciidoc @@ -12,7 +12,7 @@ Retrieves information about all follower indices. ==== Description -This API lists the configured parameters and the status for each follower index. +This API lists the parameters and the status for each follower index. For example, the results include follower index names, leader index names, replication options and whether the follower indices are active or paused. @@ -80,48 +80,48 @@ The `indices` array consists of objects containing several fields: The `parameters` contains the following fields: `indices[].parameters.max_read_request_operation_count`:: - (integer) The configured maximum number of operations to pull per read from - the remote cluster + (integer) The maximum number of operations to pull per read from the remote + cluster `indices[].parameters.max_outstanding_read_requests`:: - (long) The configured maximum number of outstanding read requests from the - remote cluster + (long) The maximum number of outstanding read requests from the remote cluster `indices[].parameters.max_read_request_size`:: - (<>) The configured maximum size in bytes of per read - of a batch of operations pulled from the remote cluster + (<>) The maximum size in bytes of per read of a batch + of operations pulled from the remote cluster `indices[].parameters.max_write_request_operation_count`:: - (integer) The configured maximum number of operations per bulk write request executed on the follower + (integer) The maximum number of operations per bulk write request executed on + the follower `indices[].parameters.max_write_request_size`:: - (<>) The configured maximum total bytes of operations - per bulk write request executed on the follower + (<>) The maximum total bytes of operations per bulk + write request executed on the follower `indices[].parameters.max_outstanding_write_requests`:: - (integer) The configured maximum number of outstanding write requests on the follower + (integer) The maximum number of outstanding write requests on the follower `indices[].parameters.max_write_buffer_count`:: - (integer) The configured maximum number of operations that can be queued for writing. When this limit is reached, reads from the remote cluster are - deferred until the number of queued operations goes below the limit + (integer) The maximum number of operations that can be queued for writing. + When this limit is reached, reads from the remote cluster are deferred until + the number of queued operations goes below the limit `indices[].parameters.max_write_buffer_size`:: - (<>) The configured maximum total bytes of operations - that can be queued for writing. When this limit is reached, reads from the - remote cluster are deferred until the total bytes of queued operations goes - below the limit + (<>) The maximum total bytes of operations that can be + queued for writing. When this limit is reached, reads from the remote cluster + are deferred until the total bytes of queued operations goes below the limit `indices[].parameters.max_retry_delay`:: - (<>) The configured maximum time to wait before - retrying an operation that failed exceptionally. An exponential backoff - strategy is employed when retrying + (<>) The maximum time to wait before retrying an + operation that failed exceptionally. An exponential backoff strategy is + employed when retrying `indices[].parameters.read_poll_timeout`:: - (<>) The configured maximum time to wait for new - operations on the remote cluster when the follower index is synchronized with - the leader index. When the timeout has elapsed, the poll for operations - returns to the follower so that it can update some statistics, then the - follower immediately attempts to read from the leader again + (<>) The maximum time to wait for new operations on the + remote cluster when the follower index is synchronized with the leader index. + When the timeout has elapsed, the poll for operations returns to the follower + so that it can update some statistics, then the follower immediately attempts + to read from the leader again ==== Authorization