diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index 60b9f8f23e8b3..732f8527dc188 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -199,13 +199,13 @@ private static void refresh(String index) throws IOException { private static void followIndex(String leaderIndex, String followIndex) throws IOException { final Request request = new Request("POST", "/" + followIndex + "/_ccr/follow"); - request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}"); + request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}"); assertOK(client().performRequest(request)); } private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException { final Request request = new Request("POST", "/" + followIndex + "/_ccr/create_and_follow"); - request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}"); + request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}"); assertOK(client().performRequest(request)); } diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java index c7ecbe184de88..ed0971f18250f 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -127,13 +127,13 @@ private static void refresh(String index) throws IOException { private static void followIndex(String leaderIndex, String followIndex) throws IOException { final Request request = new Request("POST", "/" + followIndex + "/_ccr/follow"); - request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}"); + request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}"); assertOK(client().performRequest(request)); } private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException { final Request request = new Request("POST", "/" + followIndex + "/_ccr/create_and_follow"); - request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}"); + request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}"); assertOK(client().performRequest(request)); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 3a524e5724980..46679d22520c3 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -297,12 +297,16 @@ private void followLeaderIndex(String clusterAlias, Index indexToFollow, String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName : clusterAlias + ":" + leaderIndexName; - FollowIndexAction.Request request = - new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName, - pattern.getMaxBatchOperationCount(), pattern.getMaxConcurrentReadBatches(), - pattern.getMaxOperationSizeInBytes(), pattern.getMaxConcurrentWriteBatches(), - pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(), - pattern.getIdleShardRetryDelay()); + FollowIndexAction.Request request = new FollowIndexAction.Request(); + request.setLeaderIndex(leaderIndexNameWithClusterAliasPrefix); + request.setFollowerIndex(followIndexName); + request.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount()); + request.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches()); + request.setMaxOperationSizeInBytes(pattern.getMaxOperationSizeInBytes()); + request.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches()); + request.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize()); + request.setMaxRetryDelay(pattern.getMaxRetryDelay()); + request.setPollTimeout(pattern.getIdleShardRetryDelay()); // Execute if the create and follow api call succeeds: Runnable successHandler = () -> { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index bf2bbd5af8a5c..937ca0a009613 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -32,7 +32,6 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction; import java.io.IOException; import java.util.ArrayList; @@ -64,8 +63,8 @@ public static class Request extends SingleShardRequest { private int maxOperationCount; private ShardId shardId; private String expectedHistoryUUID; - private TimeValue pollTimeout = FollowIndexAction.DEFAULT_POLL_TIMEOUT; - private long maxOperationSizeInBytes = FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES; + private TimeValue pollTimeout = TransportFollowIndexAction.DEFAULT_POLL_TIMEOUT; + private long maxOperationSizeInBytes = TransportFollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES; public Request(ShardId shardId, String expectedHistoryUUID) { super(shardId.getIndexName()); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java index eccda262636d2..e9ee38fd1f9e2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexingSlowLog; @@ -55,6 +56,14 @@ public class TransportFollowIndexAction extends HandledTransportAction { + static final long DEFAULT_MAX_BATCH_SIZE_IN_BYTES = Long.MAX_VALUE; + private static final TimeValue DEFAULT_MAX_RETRY_DELAY = new TimeValue(500); + private static final int DEFAULT_MAX_CONCURRENT_WRITE_BATCHES = 1; + private static final int DEFAULT_MAX_WRITE_BUFFER_SIZE = 10240; + private static final int DEFAULT_MAX_BATCH_OPERATION_COUNT = 1024; + private static final int DEFAULT_MAX_CONCURRENT_READ_BATCHES = 1; + static final TimeValue DEFAULT_POLL_TIMEOUT = TimeValue.timeValueMinutes(1); + private final Client client; private final ThreadPool threadPool; private final ClusterService clusterService; @@ -179,19 +188,8 @@ void start( String[] recordedLeaderShardHistoryUUIDs = extractIndexShardHistoryUUIDs(ccrIndexMetadata); String recordedLeaderShardHistoryUUID = recordedLeaderShardHistoryUUIDs[shardId]; - ShardFollowTask shardFollowTask = new ShardFollowTask( - clusterNameAlias, - new ShardId(followIndexMetadata.getIndex(), shardId), - new ShardId(leaderIndexMetadata.getIndex(), shardId), - request.getMaxBatchOperationCount(), - request.getMaxConcurrentReadBatches(), - request.getMaxOperationSizeInBytes(), - request.getMaxConcurrentWriteBatches(), - request.getMaxWriteBufferSize(), - request.getMaxRetryDelay(), - request.getPollTimeout(), - recordedLeaderShardHistoryUUID, - filteredHeaders); + final ShardFollowTask shardFollowTask = createShardFollowTask(shardId, clusterNameAlias, request, + leaderIndexMetadata, followIndexMetadata, recordedLeaderShardHistoryUUID, filteredHeaders); persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, new ActionListener>() { @Override @@ -299,6 +297,69 @@ static void validate( followerMapperService.merge(leaderIndex, MapperService.MergeReason.MAPPING_RECOVERY); } + private static ShardFollowTask createShardFollowTask( + int shardId, + String clusterAliasName, + FollowIndexAction.Request request, + IndexMetaData leaderIndexMetadata, + IndexMetaData followIndexMetadata, + String recordedLeaderShardHistoryUUID, + Map filteredHeaders + ) { + int maxBatchOperationCount; + if (request.getMaxBatchOperationCount() != null) { + maxBatchOperationCount = request.getMaxBatchOperationCount(); + } else { + maxBatchOperationCount = DEFAULT_MAX_BATCH_OPERATION_COUNT; + } + + int maxConcurrentReadBatches; + if (request.getMaxConcurrentReadBatches() != null){ + maxConcurrentReadBatches = request.getMaxConcurrentReadBatches(); + } else { + maxConcurrentReadBatches = DEFAULT_MAX_CONCURRENT_READ_BATCHES; + } + + long maxOperationSizeInBytes; + if (request.getMaxOperationSizeInBytes() != null) { + maxOperationSizeInBytes = request.getMaxOperationSizeInBytes(); + } else { + maxOperationSizeInBytes = DEFAULT_MAX_BATCH_SIZE_IN_BYTES; + } + + int maxConcurrentWriteBatches; + if (request.getMaxConcurrentWriteBatches() != null) { + maxConcurrentWriteBatches = request.getMaxConcurrentWriteBatches(); + } else { + maxConcurrentWriteBatches = DEFAULT_MAX_CONCURRENT_WRITE_BATCHES; + } + + int maxWriteBufferSize; + if (request.getMaxWriteBufferSize() != null) { + maxWriteBufferSize = request.getMaxWriteBufferSize(); + } else { + maxWriteBufferSize = DEFAULT_MAX_WRITE_BUFFER_SIZE; + } + + TimeValue maxRetryDelay = request.getMaxRetryDelay() == null ? DEFAULT_MAX_RETRY_DELAY : request.getMaxRetryDelay(); + TimeValue pollTimeout = request.getPollTimeout() == null ? DEFAULT_POLL_TIMEOUT : request.getPollTimeout(); + + return new ShardFollowTask( + clusterAliasName, + new ShardId(followIndexMetadata.getIndex(), shardId), + new ShardId(leaderIndexMetadata.getIndex(), shardId), + maxBatchOperationCount, + maxConcurrentReadBatches, + maxOperationSizeInBytes, + maxConcurrentWriteBatches, + maxWriteBufferSize, + maxRetryDelay, + pollTimeout, + recordedLeaderShardHistoryUUID, + filteredHeaders + ); + } + private static String[] extractIndexShardHistoryUUIDs(Map ccrIndexMetaData) { String historyUUIDs = ccrIndexMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS); return historyUUIDs.split(","); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java index 1e7e3fe42df27..a74b1e33cd26e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java @@ -192,16 +192,12 @@ private void assertNonCompliantLicense(final Exception e) { } private FollowIndexAction.Request getFollowRequest() { - return new FollowIndexAction.Request( - "leader", - "follower", - FollowIndexAction.DEFAULT_MAX_BATCH_OPERATION_COUNT, - FollowIndexAction.DEFAULT_MAX_CONCURRENT_READ_BATCHES, - FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, - FollowIndexAction.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES, - FollowIndexAction.DEFAULT_MAX_WRITE_BUFFER_SIZE, - TimeValue.timeValueMillis(10), - TimeValue.timeValueMillis(10)); + FollowIndexAction.Request request = new FollowIndexAction.Request(); + request.setLeaderIndex("leader"); + request.setFollowerIndex("follower"); + request.setMaxRetryDelay(TimeValue.timeValueMillis(10)); + request.setPollTimeout(TimeValue.timeValueMillis(10)); + return request; } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index 3d1789389d775..e3238d8b58acc 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -319,9 +319,11 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) long numDocsIndexed = Math.min(3000 * 2, randomLongBetween(maxReadSize, maxReadSize * 10)); atLeastDocsIndexed("index1", numDocsIndexed / 3); - final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", maxReadSize, - randomIntBetween(2, 10), Long.MAX_VALUE, randomIntBetween(2, 10), - randomIntBetween(1024, 10240), TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10)); + FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); + followRequest.setMaxBatchOperationCount(maxReadSize); + followRequest.setMaxConcurrentReadBatches(randomIntBetween(2, 10)); + followRequest.setMaxConcurrentWriteBatches(randomIntBetween(2, 10)); + followRequest.setMaxWriteBufferSize(randomIntBetween(1024, 10240)); CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); @@ -358,9 +360,10 @@ public void testFollowIndexAndCloseNode() throws Exception { }); thread.start(); - final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", randomIntBetween(32, 2048), - randomIntBetween(2, 10), Long.MAX_VALUE, randomIntBetween(2, 10), - FollowIndexAction.DEFAULT_MAX_WRITE_BUFFER_SIZE, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10)); + FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); + followRequest.setMaxBatchOperationCount(randomIntBetween(32, 2048)); + followRequest.setMaxConcurrentReadBatches(randomIntBetween(2, 10)); + followRequest.setMaxConcurrentWriteBatches(randomIntBetween(2, 10)); client().execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest)).get(); long maxNumDocsReplicated = Math.min(1000, randomLongBetween(followRequest.getMaxBatchOperationCount(), @@ -438,7 +441,7 @@ public void testFollowNonExistentIndex() throws Exception { expectThrows(IndexNotFoundException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest3).actionGet()); } - public void testFollowIndex_lowMaxTranslogBytes() throws Exception { + public void testFollowIndexMaxOperationSizeInBytes() throws Exception { final String leaderIndexSettings = getIndexSettings(1, between(0, 1), singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); @@ -451,8 +454,8 @@ public void testFollowIndex_lowMaxTranslogBytes() throws Exception { client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); } - final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", 1024, 1, 1024L, - 1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10)); + FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); + followRequest.setMaxOperationSizeInBytes(1L); final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); @@ -480,25 +483,21 @@ public void testDontFollowTheWrongIndex() throws Exception { assertAcked(client().admin().indices().prepareCreate("index3").setSource(leaderIndexSettings, XContentType.JSON)); ensureGreen("index3"); - FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", 1024, 1, 1024L, - 1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10)); + FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); - followRequest = new FollowIndexAction.Request("index3", "index4", 1024, 1, 1024L, - 1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10)); + followRequest = createFollowRequest("index3", "index4"); createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); unfollowIndex("index2", "index4"); - FollowIndexAction.Request wrongRequest1 = new FollowIndexAction.Request("index1", "index4", 1024, 1, 1024L, - 1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10)); + FollowIndexAction.Request wrongRequest1 = createFollowRequest("index1", "index4"); Exception e = expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, wrongRequest1).actionGet()); assertThat(e.getMessage(), containsString("follow index [index4] should reference")); - FollowIndexAction.Request wrongRequest2 = new FollowIndexAction.Request("index3", "index2", 1024, 1, 1024L, - 1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10)); + FollowIndexAction.Request wrongRequest2 = createFollowRequest("index3", "index2"); e = expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, wrongRequest2).actionGet()); assertThat(e.getMessage(), containsString("follow index [index2] should reference")); } @@ -707,10 +706,12 @@ private void assertSameDocCount(String index1, String index2) throws Exception { }, 60, TimeUnit.SECONDS); } - public static FollowIndexAction.Request createFollowRequest(String leaderIndex, String followIndex) { - return new FollowIndexAction.Request(leaderIndex, followIndex, FollowIndexAction.DEFAULT_MAX_BATCH_OPERATION_COUNT, - FollowIndexAction.DEFAULT_MAX_CONCURRENT_READ_BATCHES, FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, - FollowIndexAction.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES, FollowIndexAction.DEFAULT_MAX_WRITE_BUFFER_SIZE, - TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10)); + public static FollowIndexAction.Request createFollowRequest(String leaderIndex, String followerIndex) { + FollowIndexAction.Request request = new FollowIndexAction.Request(); + request.setLeaderIndex(leaderIndex); + request.setFollowerIndex(followerIndex); + request.setMaxRetryDelay(TimeValue.timeValueMillis(10)); + request.setPollTimeout(TimeValue.timeValueMillis(10)); + return request; } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexRequestTests.java index e5f7e693a7f1c..2bff73d223b57 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexRequestTests.java @@ -40,24 +40,49 @@ protected boolean supportsUnknownFields() { } static FollowIndexAction.Request createTestRequest() { - return new FollowIndexAction.Request(randomAlphaOfLength(4), randomAlphaOfLength(4), randomIntBetween(1, Integer.MAX_VALUE), - randomIntBetween(1, Integer.MAX_VALUE), randomNonNegativeLong(), randomIntBetween(1, Integer.MAX_VALUE), - randomIntBetween(1, Integer.MAX_VALUE), TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(500)); + FollowIndexAction.Request request = new FollowIndexAction.Request(); + request.setLeaderIndex(randomAlphaOfLength(4)); + request.setFollowerIndex(randomAlphaOfLength(4)); + if (randomBoolean()) { + request.setMaxBatchOperationCount(randomIntBetween(1, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxConcurrentReadBatches(randomIntBetween(1, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxConcurrentWriteBatches(randomIntBetween(1, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxOperationSizeInBytes(randomNonNegativeLong()); + } + if (randomBoolean()) { + request.setMaxWriteBufferSize(randomIntBetween(1, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxRetryDelay(TimeValue.timeValueMillis(500)); + } + if (randomBoolean()) { + request.setPollTimeout(TimeValue.timeValueMillis(500)); + } + return request; } public void testValidate() { - FollowIndexAction.Request request = new FollowIndexAction.Request("index1", "index2", null, null, null, null, - null, TimeValue.ZERO, null); + FollowIndexAction.Request request = new FollowIndexAction.Request(); + request.setLeaderIndex("index1"); + request.setFollowerIndex("index2"); + request.setMaxRetryDelay(TimeValue.ZERO); + ActionRequestValidationException validationException = request.validate(); assertThat(validationException, notNullValue()); assertThat(validationException.getMessage(), containsString("[max_retry_delay] must be positive but was [0ms]")); - request = new FollowIndexAction.Request("index1", "index2", null, null, null, null, null, TimeValue.timeValueMinutes(10), null); + request.setMaxRetryDelay(TimeValue.timeValueMinutes(10)); validationException = request.validate(); assertThat(validationException, notNullValue()); assertThat(validationException.getMessage(), containsString("[max_retry_delay] must be less than [5m] but was [10m]")); - request = new FollowIndexAction.Request("index1", "index2", null, null, null, null, null, TimeValue.timeValueMinutes(1), null); + request.setMaxRetryDelay(TimeValue.timeValueMinutes(1)); validationException = request.validate(); assertThat(validationException, nullValue()); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index b0181de812a38..e7d0987223bb9 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -15,7 +15,6 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; -import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import java.nio.charset.StandardCharsets; @@ -81,7 +80,7 @@ private ShardFollowNodeTask createShardFollowTask(int concurrency, TestRun testR new ShardId("leader_index", "", 0), testRun.maxOperationCount, concurrency, - FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, + TransportFollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, concurrency, 10240, TimeValue.timeValueMillis(10), diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java index 65136b41a29e0..c90ef63862b9a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -30,14 +29,7 @@ public final class FollowIndexAction extends Action { public static final FollowIndexAction INSTANCE = new FollowIndexAction(); public static final String NAME = "cluster:admin/xpack/ccr/follow_index"; - public static final int DEFAULT_MAX_WRITE_BUFFER_SIZE = 10240; - public static final int DEFAULT_MAX_BATCH_OPERATION_COUNT = 1024; - public static final int DEFAULT_MAX_CONCURRENT_READ_BATCHES = 1; - public static final int DEFAULT_MAX_CONCURRENT_WRITE_BATCHES = 1; - public static final long DEFAULT_MAX_BATCH_SIZE_IN_BYTES = Long.MAX_VALUE; - static final TimeValue DEFAULT_MAX_RETRY_DELAY = new TimeValue(500); - static final TimeValue MAX_RETRY_DELAY = TimeValue.timeValueMinutes(5); - public static final TimeValue DEFAULT_POLL_TIMEOUT = TimeValue.timeValueMinutes(1); + public static final TimeValue MAX_RETRY_DELAY = TimeValue.timeValueMinutes(5); private FollowIndexAction() { super(NAME); @@ -59,30 +51,23 @@ public static class Request extends ActionRequest implements ToXContentObject { private static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); private static final ParseField MAX_RETRY_DELAY_FIELD = new ParseField("max_retry_delay"); private static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout"); - private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, true, - (args, followerIndex) -> { - if (args[1] != null) { - followerIndex = (String) args[1]; - } - return new Request((String) args[0], followerIndex, (Integer) args[2], (Integer) args[3], (Long) args[4], - (Integer) args[5], (Integer) args[6], (TimeValue) args[7], (TimeValue) args[8]); - }); + private static final ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); static { - PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_INDEX_FIELD); - PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FOLLOWER_INDEX_FIELD); - PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_BATCH_OPERATION_COUNT); - PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_CONCURRENT_READ_BATCHES); - PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), MAX_BATCH_SIZE_IN_BYTES); - PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_CONCURRENT_WRITE_BATCHES); - PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_WRITE_BUFFER_SIZE); + PARSER.declareString(Request::setLeaderIndex, LEADER_INDEX_FIELD); + PARSER.declareString(Request::setFollowerIndex, FOLLOWER_INDEX_FIELD); + PARSER.declareInt(Request::setMaxBatchOperationCount, MAX_BATCH_OPERATION_COUNT); + PARSER.declareInt(Request::setMaxConcurrentReadBatches, MAX_CONCURRENT_READ_BATCHES); + PARSER.declareLong(Request::setMaxOperationSizeInBytes, MAX_BATCH_SIZE_IN_BYTES); + PARSER.declareInt(Request::setMaxConcurrentWriteBatches, MAX_CONCURRENT_WRITE_BATCHES); + PARSER.declareInt(Request::setMaxWriteBufferSize, MAX_WRITE_BUFFER_SIZE); PARSER.declareField( - ConstructingObjectParser.optionalConstructorArg(), + Request::setMaxRetryDelay, (p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY_FIELD.getPreferredName()), MAX_RETRY_DELAY_FIELD, ObjectParser.ValueType.STRING); PARSER.declareField( - ConstructingObjectParser.optionalConstructorArg(), + Request::setPollTimeout, (p, c) -> TimeValue.parseTimeValue(p.text(), POLL_TIMEOUT.getPreferredName()), POLL_TIMEOUT, ObjectParser.ValueType.STRING); @@ -108,6 +93,9 @@ public String getLeaderIndex() { return leaderIndex; } + public void setLeaderIndex(String leaderIndex) { + this.leaderIndex = leaderIndex; + } private String followerIndex; @@ -115,38 +103,66 @@ public String getFollowerIndex() { return followerIndex; } - private int maxBatchOperationCount; + public void setFollowerIndex(String followerIndex) { + this.followerIndex = followerIndex; + } + + private Integer maxBatchOperationCount; - public int getMaxBatchOperationCount() { + public Integer getMaxBatchOperationCount() { return maxBatchOperationCount; } - private int maxConcurrentReadBatches; + public void setMaxBatchOperationCount(Integer maxBatchOperationCount) { + this.maxBatchOperationCount = maxBatchOperationCount; + } - public int getMaxConcurrentReadBatches() { + private Integer maxConcurrentReadBatches; + + public Integer getMaxConcurrentReadBatches() { return maxConcurrentReadBatches; } - private long maxOperationSizeInBytes; + public void setMaxConcurrentReadBatches(Integer maxConcurrentReadBatches) { + this.maxConcurrentReadBatches = maxConcurrentReadBatches; + } + + private Long maxOperationSizeInBytes; - public long getMaxOperationSizeInBytes() { + public Long getMaxOperationSizeInBytes() { return maxOperationSizeInBytes; } - private int maxConcurrentWriteBatches; + public void setMaxOperationSizeInBytes(Long maxOperationSizeInBytes) { + this.maxOperationSizeInBytes = maxOperationSizeInBytes; + } + + private Integer maxConcurrentWriteBatches; - public int getMaxConcurrentWriteBatches() { + public Integer getMaxConcurrentWriteBatches() { return maxConcurrentWriteBatches; } - private int maxWriteBufferSize; + public void setMaxConcurrentWriteBatches(Integer maxConcurrentWriteBatches) { + this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; + } + + private Integer maxWriteBufferSize; - public int getMaxWriteBufferSize() { + public Integer getMaxWriteBufferSize() { return maxWriteBufferSize; } + public void setMaxWriteBufferSize(Integer maxWriteBufferSize) { + this.maxWriteBufferSize = maxWriteBufferSize; + } + private TimeValue maxRetryDelay; + public void setMaxRetryDelay(TimeValue maxRetryDelay) { + this.maxRetryDelay = maxRetryDelay; + } + public TimeValue getMaxRetryDelay() { return maxRetryDelay; } @@ -157,88 +173,50 @@ public TimeValue getPollTimeout() { return pollTimeout; } - public Request( - final String leaderIndex, - final String followerIndex, - final Integer maxBatchOperationCount, - final Integer maxConcurrentReadBatches, - final Long maxOperationSizeInBytes, - final Integer maxConcurrentWriteBatches, - final Integer maxWriteBufferSize, - final TimeValue maxRetryDelay, - final TimeValue pollTimeout) { + public void setPollTimeout(TimeValue pollTimeout) { + this.pollTimeout = pollTimeout; + } + + public Request() { + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException e = null; if (leaderIndex == null) { - throw new IllegalArgumentException(LEADER_INDEX_FIELD.getPreferredName() + " is missing"); + e = addValidationError(LEADER_INDEX_FIELD.getPreferredName() + " is missing", e); } - if (followerIndex == null) { - throw new IllegalArgumentException(FOLLOWER_INDEX_FIELD.getPreferredName() + " is missing"); + e = addValidationError(FOLLOWER_INDEX_FIELD.getPreferredName() + " is missing", e); } - - final int actualMaxBatchOperationCount = - maxBatchOperationCount == null ? DEFAULT_MAX_BATCH_OPERATION_COUNT : maxBatchOperationCount; - if (actualMaxBatchOperationCount < 1) { - throw new IllegalArgumentException(MAX_BATCH_OPERATION_COUNT.getPreferredName() + " must be larger than 0"); + if (maxBatchOperationCount != null && maxBatchOperationCount < 1) { + e = addValidationError(MAX_BATCH_OPERATION_COUNT.getPreferredName() + " must be larger than 0", e); } - - final int actualMaxConcurrentReadBatches = - maxConcurrentReadBatches == null ? DEFAULT_MAX_CONCURRENT_READ_BATCHES : maxConcurrentReadBatches; - if (actualMaxConcurrentReadBatches < 1) { - throw new IllegalArgumentException(MAX_CONCURRENT_READ_BATCHES.getPreferredName() + " must be larger than 0"); + if (maxConcurrentReadBatches != null && maxConcurrentReadBatches < 1) { + e = addValidationError(MAX_CONCURRENT_READ_BATCHES.getPreferredName() + " must be larger than 0", e); } - - final long actualMaxOperationSizeInBytes = - maxOperationSizeInBytes == null ? DEFAULT_MAX_BATCH_SIZE_IN_BYTES : maxOperationSizeInBytes; - if (actualMaxOperationSizeInBytes <= 0) { - throw new IllegalArgumentException(MAX_BATCH_SIZE_IN_BYTES.getPreferredName() + " must be larger than 0"); + if (maxOperationSizeInBytes != null && maxOperationSizeInBytes <= 0) { + e = addValidationError(MAX_BATCH_SIZE_IN_BYTES.getPreferredName() + " must be larger than 0", e); } - - final int actualMaxConcurrentWriteBatches = - maxConcurrentWriteBatches == null ? DEFAULT_MAX_CONCURRENT_WRITE_BATCHES : maxConcurrentWriteBatches; - if (actualMaxConcurrentWriteBatches < 1) { - throw new IllegalArgumentException(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName() + " must be larger than 0"); + if (maxConcurrentWriteBatches != null && maxConcurrentWriteBatches < 1) { + e = addValidationError(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName() + " must be larger than 0", e); } - - final int actualMaxWriteBufferSize = maxWriteBufferSize == null ? DEFAULT_MAX_WRITE_BUFFER_SIZE : maxWriteBufferSize; - if (actualMaxWriteBufferSize < 1) { - throw new IllegalArgumentException(MAX_WRITE_BUFFER_SIZE.getPreferredName() + " must be larger than 0"); + if (maxWriteBufferSize != null && maxWriteBufferSize < 1) { + e = addValidationError(MAX_WRITE_BUFFER_SIZE.getPreferredName() + " must be larger than 0", e); } - - final TimeValue actualRetryTimeout = maxRetryDelay == null ? DEFAULT_MAX_RETRY_DELAY : maxRetryDelay; - final TimeValue actualPollTimeout = pollTimeout == null ? DEFAULT_POLL_TIMEOUT : pollTimeout; - - this.leaderIndex = leaderIndex; - this.followerIndex = followerIndex; - this.maxBatchOperationCount = actualMaxBatchOperationCount; - this.maxConcurrentReadBatches = actualMaxConcurrentReadBatches; - this.maxOperationSizeInBytes = actualMaxOperationSizeInBytes; - this.maxConcurrentWriteBatches = actualMaxConcurrentWriteBatches; - this.maxWriteBufferSize = actualMaxWriteBufferSize; - this.maxRetryDelay = actualRetryTimeout; - this.pollTimeout = actualPollTimeout; - } - - public Request() { - - } - - @Override - public ActionRequestValidationException validate() { - ActionRequestValidationException validationException = null; - - if (maxRetryDelay.millis() <= 0) { + if (maxRetryDelay != null && maxRetryDelay.millis() <= 0) { String message = "[" + MAX_RETRY_DELAY_FIELD.getPreferredName() + "] must be positive but was [" + maxRetryDelay.getStringRep() + "]"; - validationException = addValidationError(message, validationException); + e = addValidationError(message, e); } - if (maxRetryDelay.millis() > FollowIndexAction.MAX_RETRY_DELAY.millis()) { + if (maxRetryDelay != null && maxRetryDelay.millis() > FollowIndexAction.MAX_RETRY_DELAY.millis()) { String message = "[" + MAX_RETRY_DELAY_FIELD.getPreferredName() + "] must be less than [" + MAX_RETRY_DELAY + "] but was [" + maxRetryDelay.getStringRep() + "]"; - validationException = addValidationError(message, validationException); + e = addValidationError(message, e); } - return validationException; + return e; } @Override @@ -246,11 +224,11 @@ public void readFrom(final StreamInput in) throws IOException { super.readFrom(in); leaderIndex = in.readString(); followerIndex = in.readString(); - maxBatchOperationCount = in.readVInt(); - maxConcurrentReadBatches = in.readVInt(); - maxOperationSizeInBytes = in.readVLong(); - maxConcurrentWriteBatches = in.readVInt(); - maxWriteBufferSize = in.readVInt(); + maxBatchOperationCount = in.readOptionalVInt(); + maxConcurrentReadBatches = in.readOptionalVInt(); + maxOperationSizeInBytes = in.readOptionalLong(); + maxConcurrentWriteBatches = in.readOptionalVInt(); + maxWriteBufferSize = in.readOptionalVInt(); maxRetryDelay = in.readOptionalTimeValue(); pollTimeout = in.readOptionalTimeValue(); } @@ -260,11 +238,11 @@ public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); out.writeString(leaderIndex); out.writeString(followerIndex); - out.writeVInt(maxBatchOperationCount); - out.writeVInt(maxConcurrentReadBatches); - out.writeVLong(maxOperationSizeInBytes); - out.writeVInt(maxConcurrentWriteBatches); - out.writeVInt(maxWriteBufferSize); + out.writeOptionalVInt(maxBatchOperationCount); + out.writeOptionalVInt(maxConcurrentReadBatches); + out.writeOptionalLong(maxOperationSizeInBytes); + out.writeOptionalVInt(maxConcurrentWriteBatches); + out.writeOptionalVInt(maxWriteBufferSize); out.writeOptionalTimeValue(maxRetryDelay); out.writeOptionalTimeValue(pollTimeout); } @@ -275,13 +253,27 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa { builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex); builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex); - builder.field(MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount); - builder.field(MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxOperationSizeInBytes); - builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); - builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches); - builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); - builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), maxRetryDelay.getStringRep()); - builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep()); + if (maxBatchOperationCount != null) { + builder.field(MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount); + } + if (maxOperationSizeInBytes != null) { + builder.field(MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxOperationSizeInBytes); + } + if (maxWriteBufferSize != null) { + builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); + } + if (maxConcurrentReadBatches != null) { + builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches); + } + if (maxConcurrentWriteBatches != null) { + builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); + } + if (maxRetryDelay != null) { + builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), maxRetryDelay.getStringRep()); + } + if (pollTimeout != null) { + builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep()); + } } builder.endObject(); return builder; @@ -292,11 +284,11 @@ public boolean equals(final Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Request request = (Request) o; - return maxBatchOperationCount == request.maxBatchOperationCount && - maxConcurrentReadBatches == request.maxConcurrentReadBatches && - maxOperationSizeInBytes == request.maxOperationSizeInBytes && - maxConcurrentWriteBatches == request.maxConcurrentWriteBatches && - maxWriteBufferSize == request.maxWriteBufferSize && + return Objects.equals(maxBatchOperationCount, request.maxBatchOperationCount) && + Objects.equals(maxConcurrentReadBatches, request.maxConcurrentReadBatches) && + Objects.equals(maxOperationSizeInBytes, request.maxOperationSizeInBytes) && + Objects.equals(maxConcurrentWriteBatches, request.maxConcurrentWriteBatches) && + Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) && Objects.equals(maxRetryDelay, request.maxRetryDelay) && Objects.equals(pollTimeout, request.pollTimeout) && Objects.equals(leaderIndex, request.leaderIndex) &&