From c2021f63563b38e1bbbbe14fca03f5713ee3330c Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 18 Sep 2018 14:26:27 +0200 Subject: [PATCH 1/2] [CCR] Change FollowIndexAction.Request class to be more user friendly Instead of having one constructor that accepts all arguments, all parameters should be provided via setters. Only leader and follower index are required arguments. This makes using this class in tests and transport client easier. --- .../ccr/action/AutoFollowCoordinator.java | 16 +- .../xpack/ccr/action/ShardChangesAction.java | 5 +- .../action/TransportFollowIndexAction.java | 87 ++++++- .../elasticsearch/xpack/ccr/CcrLicenseIT.java | 16 +- .../xpack/ccr/ShardChangesIT.java | 45 ++-- .../ccr/action/FollowIndexRequestTests.java | 39 ++- .../ShardFollowNodeTaskRandomTests.java | 3 +- .../core/ccr/action/FollowIndexAction.java | 238 +++++++++--------- 8 files changed, 263 insertions(+), 186 deletions(-) diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 3a524e5724980..74c189d8b0334 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -297,12 +297,16 @@ private void followLeaderIndex(String clusterAlias, Index indexToFollow, String leaderIndexNameWithClusterAliasPrefix = clusterAlias.equals("_local_") ? leaderIndexName : clusterAlias + ":" + leaderIndexName; - FollowIndexAction.Request request = - new FollowIndexAction.Request(leaderIndexNameWithClusterAliasPrefix, followIndexName, - pattern.getMaxBatchOperationCount(), pattern.getMaxConcurrentReadBatches(), - pattern.getMaxOperationSizeInBytes(), pattern.getMaxConcurrentWriteBatches(), - pattern.getMaxWriteBufferSize(), pattern.getMaxRetryDelay(), - pattern.getIdleShardRetryDelay()); + FollowIndexAction.Request request = new FollowIndexAction.Request(); + request.setLeaderIndex(leaderIndexNameWithClusterAliasPrefix); + request.setFollowerIndex(followIndexName); + request.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount()); + request.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches()); + request.setMaxOperationSizeInBytes(pattern.getMaxOperationSizeInBytes()); + request.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches()); + request.setMaxWriteBufferSize(request.getMaxWriteBufferSize()); + request.setMaxRetryDelay(pattern.getMaxRetryDelay()); + request.setPollTimeout(pattern.getIdleShardRetryDelay()); // Execute if the create and follow api call succeeds: Runnable successHandler = () -> { diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java index bf2bbd5af8a5c..937ca0a009613 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java @@ -32,7 +32,6 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; -import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction; import java.io.IOException; import java.util.ArrayList; @@ -64,8 +63,8 @@ public static class Request extends SingleShardRequest { private int maxOperationCount; private ShardId shardId; private String expectedHistoryUUID; - private TimeValue pollTimeout = FollowIndexAction.DEFAULT_POLL_TIMEOUT; - private long maxOperationSizeInBytes = FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES; + private TimeValue pollTimeout = TransportFollowIndexAction.DEFAULT_POLL_TIMEOUT; + private long maxOperationSizeInBytes = TransportFollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES; public Request(ShardId shardId, String expectedHistoryUUID) { super(shardId.getIndexName()); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java index eccda262636d2..e9ee38fd1f9e2 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java @@ -19,6 +19,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexingSlowLog; @@ -55,6 +56,14 @@ public class TransportFollowIndexAction extends HandledTransportAction { + static final long DEFAULT_MAX_BATCH_SIZE_IN_BYTES = Long.MAX_VALUE; + private static final TimeValue DEFAULT_MAX_RETRY_DELAY = new TimeValue(500); + private static final int DEFAULT_MAX_CONCURRENT_WRITE_BATCHES = 1; + private static final int DEFAULT_MAX_WRITE_BUFFER_SIZE = 10240; + private static final int DEFAULT_MAX_BATCH_OPERATION_COUNT = 1024; + private static final int DEFAULT_MAX_CONCURRENT_READ_BATCHES = 1; + static final TimeValue DEFAULT_POLL_TIMEOUT = TimeValue.timeValueMinutes(1); + private final Client client; private final ThreadPool threadPool; private final ClusterService clusterService; @@ -179,19 +188,8 @@ void start( String[] recordedLeaderShardHistoryUUIDs = extractIndexShardHistoryUUIDs(ccrIndexMetadata); String recordedLeaderShardHistoryUUID = recordedLeaderShardHistoryUUIDs[shardId]; - ShardFollowTask shardFollowTask = new ShardFollowTask( - clusterNameAlias, - new ShardId(followIndexMetadata.getIndex(), shardId), - new ShardId(leaderIndexMetadata.getIndex(), shardId), - request.getMaxBatchOperationCount(), - request.getMaxConcurrentReadBatches(), - request.getMaxOperationSizeInBytes(), - request.getMaxConcurrentWriteBatches(), - request.getMaxWriteBufferSize(), - request.getMaxRetryDelay(), - request.getPollTimeout(), - recordedLeaderShardHistoryUUID, - filteredHeaders); + final ShardFollowTask shardFollowTask = createShardFollowTask(shardId, clusterNameAlias, request, + leaderIndexMetadata, followIndexMetadata, recordedLeaderShardHistoryUUID, filteredHeaders); persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask, new ActionListener>() { @Override @@ -299,6 +297,69 @@ static void validate( followerMapperService.merge(leaderIndex, MapperService.MergeReason.MAPPING_RECOVERY); } + private static ShardFollowTask createShardFollowTask( + int shardId, + String clusterAliasName, + FollowIndexAction.Request request, + IndexMetaData leaderIndexMetadata, + IndexMetaData followIndexMetadata, + String recordedLeaderShardHistoryUUID, + Map filteredHeaders + ) { + int maxBatchOperationCount; + if (request.getMaxBatchOperationCount() != null) { + maxBatchOperationCount = request.getMaxBatchOperationCount(); + } else { + maxBatchOperationCount = DEFAULT_MAX_BATCH_OPERATION_COUNT; + } + + int maxConcurrentReadBatches; + if (request.getMaxConcurrentReadBatches() != null){ + maxConcurrentReadBatches = request.getMaxConcurrentReadBatches(); + } else { + maxConcurrentReadBatches = DEFAULT_MAX_CONCURRENT_READ_BATCHES; + } + + long maxOperationSizeInBytes; + if (request.getMaxOperationSizeInBytes() != null) { + maxOperationSizeInBytes = request.getMaxOperationSizeInBytes(); + } else { + maxOperationSizeInBytes = DEFAULT_MAX_BATCH_SIZE_IN_BYTES; + } + + int maxConcurrentWriteBatches; + if (request.getMaxConcurrentWriteBatches() != null) { + maxConcurrentWriteBatches = request.getMaxConcurrentWriteBatches(); + } else { + maxConcurrentWriteBatches = DEFAULT_MAX_CONCURRENT_WRITE_BATCHES; + } + + int maxWriteBufferSize; + if (request.getMaxWriteBufferSize() != null) { + maxWriteBufferSize = request.getMaxWriteBufferSize(); + } else { + maxWriteBufferSize = DEFAULT_MAX_WRITE_BUFFER_SIZE; + } + + TimeValue maxRetryDelay = request.getMaxRetryDelay() == null ? DEFAULT_MAX_RETRY_DELAY : request.getMaxRetryDelay(); + TimeValue pollTimeout = request.getPollTimeout() == null ? DEFAULT_POLL_TIMEOUT : request.getPollTimeout(); + + return new ShardFollowTask( + clusterAliasName, + new ShardId(followIndexMetadata.getIndex(), shardId), + new ShardId(leaderIndexMetadata.getIndex(), shardId), + maxBatchOperationCount, + maxConcurrentReadBatches, + maxOperationSizeInBytes, + maxConcurrentWriteBatches, + maxWriteBufferSize, + maxRetryDelay, + pollTimeout, + recordedLeaderShardHistoryUUID, + filteredHeaders + ); + } + private static String[] extractIndexShardHistoryUUIDs(Map ccrIndexMetaData) { String historyUUIDs = ccrIndexMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS); return historyUUIDs.split(","); diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java index 1e7e3fe42df27..a74b1e33cd26e 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/CcrLicenseIT.java @@ -192,16 +192,12 @@ private void assertNonCompliantLicense(final Exception e) { } private FollowIndexAction.Request getFollowRequest() { - return new FollowIndexAction.Request( - "leader", - "follower", - FollowIndexAction.DEFAULT_MAX_BATCH_OPERATION_COUNT, - FollowIndexAction.DEFAULT_MAX_CONCURRENT_READ_BATCHES, - FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, - FollowIndexAction.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES, - FollowIndexAction.DEFAULT_MAX_WRITE_BUFFER_SIZE, - TimeValue.timeValueMillis(10), - TimeValue.timeValueMillis(10)); + FollowIndexAction.Request request = new FollowIndexAction.Request(); + request.setLeaderIndex("leader"); + request.setFollowerIndex("follower"); + request.setMaxRetryDelay(TimeValue.timeValueMillis(10)); + request.setPollTimeout(TimeValue.timeValueMillis(10)); + return request; } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java index 3d1789389d775..e3238d8b58acc 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java @@ -319,9 +319,11 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) long numDocsIndexed = Math.min(3000 * 2, randomLongBetween(maxReadSize, maxReadSize * 10)); atLeastDocsIndexed("index1", numDocsIndexed / 3); - final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", maxReadSize, - randomIntBetween(2, 10), Long.MAX_VALUE, randomIntBetween(2, 10), - randomIntBetween(1024, 10240), TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10)); + FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); + followRequest.setMaxBatchOperationCount(maxReadSize); + followRequest.setMaxConcurrentReadBatches(randomIntBetween(2, 10)); + followRequest.setMaxConcurrentWriteBatches(randomIntBetween(2, 10)); + followRequest.setMaxWriteBufferSize(randomIntBetween(1024, 10240)); CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); @@ -358,9 +360,10 @@ public void testFollowIndexAndCloseNode() throws Exception { }); thread.start(); - final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", randomIntBetween(32, 2048), - randomIntBetween(2, 10), Long.MAX_VALUE, randomIntBetween(2, 10), - FollowIndexAction.DEFAULT_MAX_WRITE_BUFFER_SIZE, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10)); + FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); + followRequest.setMaxBatchOperationCount(randomIntBetween(32, 2048)); + followRequest.setMaxConcurrentReadBatches(randomIntBetween(2, 10)); + followRequest.setMaxConcurrentWriteBatches(randomIntBetween(2, 10)); client().execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest)).get(); long maxNumDocsReplicated = Math.min(1000, randomLongBetween(followRequest.getMaxBatchOperationCount(), @@ -438,7 +441,7 @@ public void testFollowNonExistentIndex() throws Exception { expectThrows(IndexNotFoundException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest3).actionGet()); } - public void testFollowIndex_lowMaxTranslogBytes() throws Exception { + public void testFollowIndexMaxOperationSizeInBytes() throws Exception { final String leaderIndexSettings = getIndexSettings(1, between(0, 1), singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true")); assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON)); @@ -451,8 +454,8 @@ public void testFollowIndex_lowMaxTranslogBytes() throws Exception { client().prepareIndex("index1", "doc", Integer.toString(i)).setSource(source, XContentType.JSON).get(); } - final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", 1024, 1, 1024L, - 1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10)); + FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); + followRequest.setMaxOperationSizeInBytes(1L); final CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); @@ -480,25 +483,21 @@ public void testDontFollowTheWrongIndex() throws Exception { assertAcked(client().admin().indices().prepareCreate("index3").setSource(leaderIndexSettings, XContentType.JSON)); ensureGreen("index3"); - FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", 1024, 1, 1024L, - 1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10)); + FollowIndexAction.Request followRequest = createFollowRequest("index1", "index2"); CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); - followRequest = new FollowIndexAction.Request("index3", "index4", 1024, 1, 1024L, - 1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10)); + followRequest = createFollowRequest("index3", "index4"); createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest); client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get(); unfollowIndex("index2", "index4"); - FollowIndexAction.Request wrongRequest1 = new FollowIndexAction.Request("index1", "index4", 1024, 1, 1024L, - 1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10)); + FollowIndexAction.Request wrongRequest1 = createFollowRequest("index1", "index4"); Exception e = expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, wrongRequest1).actionGet()); assertThat(e.getMessage(), containsString("follow index [index4] should reference")); - FollowIndexAction.Request wrongRequest2 = new FollowIndexAction.Request("index3", "index2", 1024, 1, 1024L, - 1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10)); + FollowIndexAction.Request wrongRequest2 = createFollowRequest("index3", "index2"); e = expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, wrongRequest2).actionGet()); assertThat(e.getMessage(), containsString("follow index [index2] should reference")); } @@ -707,10 +706,12 @@ private void assertSameDocCount(String index1, String index2) throws Exception { }, 60, TimeUnit.SECONDS); } - public static FollowIndexAction.Request createFollowRequest(String leaderIndex, String followIndex) { - return new FollowIndexAction.Request(leaderIndex, followIndex, FollowIndexAction.DEFAULT_MAX_BATCH_OPERATION_COUNT, - FollowIndexAction.DEFAULT_MAX_CONCURRENT_READ_BATCHES, FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, - FollowIndexAction.DEFAULT_MAX_CONCURRENT_WRITE_BATCHES, FollowIndexAction.DEFAULT_MAX_WRITE_BUFFER_SIZE, - TimeValue.timeValueMillis(10), TimeValue.timeValueMillis(10)); + public static FollowIndexAction.Request createFollowRequest(String leaderIndex, String followerIndex) { + FollowIndexAction.Request request = new FollowIndexAction.Request(); + request.setLeaderIndex(leaderIndex); + request.setFollowerIndex(followerIndex); + request.setMaxRetryDelay(TimeValue.timeValueMillis(10)); + request.setPollTimeout(TimeValue.timeValueMillis(10)); + return request; } } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexRequestTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexRequestTests.java index e5f7e693a7f1c..2bff73d223b57 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexRequestTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/FollowIndexRequestTests.java @@ -40,24 +40,49 @@ protected boolean supportsUnknownFields() { } static FollowIndexAction.Request createTestRequest() { - return new FollowIndexAction.Request(randomAlphaOfLength(4), randomAlphaOfLength(4), randomIntBetween(1, Integer.MAX_VALUE), - randomIntBetween(1, Integer.MAX_VALUE), randomNonNegativeLong(), randomIntBetween(1, Integer.MAX_VALUE), - randomIntBetween(1, Integer.MAX_VALUE), TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(500)); + FollowIndexAction.Request request = new FollowIndexAction.Request(); + request.setLeaderIndex(randomAlphaOfLength(4)); + request.setFollowerIndex(randomAlphaOfLength(4)); + if (randomBoolean()) { + request.setMaxBatchOperationCount(randomIntBetween(1, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxConcurrentReadBatches(randomIntBetween(1, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxConcurrentWriteBatches(randomIntBetween(1, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxOperationSizeInBytes(randomNonNegativeLong()); + } + if (randomBoolean()) { + request.setMaxWriteBufferSize(randomIntBetween(1, Integer.MAX_VALUE)); + } + if (randomBoolean()) { + request.setMaxRetryDelay(TimeValue.timeValueMillis(500)); + } + if (randomBoolean()) { + request.setPollTimeout(TimeValue.timeValueMillis(500)); + } + return request; } public void testValidate() { - FollowIndexAction.Request request = new FollowIndexAction.Request("index1", "index2", null, null, null, null, - null, TimeValue.ZERO, null); + FollowIndexAction.Request request = new FollowIndexAction.Request(); + request.setLeaderIndex("index1"); + request.setFollowerIndex("index2"); + request.setMaxRetryDelay(TimeValue.ZERO); + ActionRequestValidationException validationException = request.validate(); assertThat(validationException, notNullValue()); assertThat(validationException.getMessage(), containsString("[max_retry_delay] must be positive but was [0ms]")); - request = new FollowIndexAction.Request("index1", "index2", null, null, null, null, null, TimeValue.timeValueMinutes(10), null); + request.setMaxRetryDelay(TimeValue.timeValueMinutes(10)); validationException = request.validate(); assertThat(validationException, notNullValue()); assertThat(validationException.getMessage(), containsString("[max_retry_delay] must be less than [5m] but was [10m]")); - request = new FollowIndexAction.Request("index1", "index2", null, null, null, null, null, TimeValue.timeValueMinutes(1), null); + request.setMaxRetryDelay(TimeValue.timeValueMinutes(1)); validationException = request.validate(); assertThat(validationException, nullValue()); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java index b0181de812a38..e7d0987223bb9 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskRandomTests.java @@ -15,7 +15,6 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse; -import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction; import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus; import java.nio.charset.StandardCharsets; @@ -81,7 +80,7 @@ private ShardFollowNodeTask createShardFollowTask(int concurrency, TestRun testR new ShardId("leader_index", "", 0), testRun.maxOperationCount, concurrency, - FollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, + TransportFollowIndexAction.DEFAULT_MAX_BATCH_SIZE_IN_BYTES, concurrency, 10240, TimeValue.timeValueMillis(10), diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java index 65136b41a29e0..c90ef63862b9a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/FollowIndexAction.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -30,14 +29,7 @@ public final class FollowIndexAction extends Action { public static final FollowIndexAction INSTANCE = new FollowIndexAction(); public static final String NAME = "cluster:admin/xpack/ccr/follow_index"; - public static final int DEFAULT_MAX_WRITE_BUFFER_SIZE = 10240; - public static final int DEFAULT_MAX_BATCH_OPERATION_COUNT = 1024; - public static final int DEFAULT_MAX_CONCURRENT_READ_BATCHES = 1; - public static final int DEFAULT_MAX_CONCURRENT_WRITE_BATCHES = 1; - public static final long DEFAULT_MAX_BATCH_SIZE_IN_BYTES = Long.MAX_VALUE; - static final TimeValue DEFAULT_MAX_RETRY_DELAY = new TimeValue(500); - static final TimeValue MAX_RETRY_DELAY = TimeValue.timeValueMinutes(5); - public static final TimeValue DEFAULT_POLL_TIMEOUT = TimeValue.timeValueMinutes(1); + public static final TimeValue MAX_RETRY_DELAY = TimeValue.timeValueMinutes(5); private FollowIndexAction() { super(NAME); @@ -59,30 +51,23 @@ public static class Request extends ActionRequest implements ToXContentObject { private static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size"); private static final ParseField MAX_RETRY_DELAY_FIELD = new ParseField("max_retry_delay"); private static final ParseField POLL_TIMEOUT = new ParseField("poll_timeout"); - private static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, true, - (args, followerIndex) -> { - if (args[1] != null) { - followerIndex = (String) args[1]; - } - return new Request((String) args[0], followerIndex, (Integer) args[2], (Integer) args[3], (Long) args[4], - (Integer) args[5], (Integer) args[6], (TimeValue) args[7], (TimeValue) args[8]); - }); + private static final ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); static { - PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), LEADER_INDEX_FIELD); - PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FOLLOWER_INDEX_FIELD); - PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_BATCH_OPERATION_COUNT); - PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_CONCURRENT_READ_BATCHES); - PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), MAX_BATCH_SIZE_IN_BYTES); - PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_CONCURRENT_WRITE_BATCHES); - PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), MAX_WRITE_BUFFER_SIZE); + PARSER.declareString(Request::setLeaderIndex, LEADER_INDEX_FIELD); + PARSER.declareString(Request::setFollowerIndex, FOLLOWER_INDEX_FIELD); + PARSER.declareInt(Request::setMaxBatchOperationCount, MAX_BATCH_OPERATION_COUNT); + PARSER.declareInt(Request::setMaxConcurrentReadBatches, MAX_CONCURRENT_READ_BATCHES); + PARSER.declareLong(Request::setMaxOperationSizeInBytes, MAX_BATCH_SIZE_IN_BYTES); + PARSER.declareInt(Request::setMaxConcurrentWriteBatches, MAX_CONCURRENT_WRITE_BATCHES); + PARSER.declareInt(Request::setMaxWriteBufferSize, MAX_WRITE_BUFFER_SIZE); PARSER.declareField( - ConstructingObjectParser.optionalConstructorArg(), + Request::setMaxRetryDelay, (p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY_FIELD.getPreferredName()), MAX_RETRY_DELAY_FIELD, ObjectParser.ValueType.STRING); PARSER.declareField( - ConstructingObjectParser.optionalConstructorArg(), + Request::setPollTimeout, (p, c) -> TimeValue.parseTimeValue(p.text(), POLL_TIMEOUT.getPreferredName()), POLL_TIMEOUT, ObjectParser.ValueType.STRING); @@ -108,6 +93,9 @@ public String getLeaderIndex() { return leaderIndex; } + public void setLeaderIndex(String leaderIndex) { + this.leaderIndex = leaderIndex; + } private String followerIndex; @@ -115,38 +103,66 @@ public String getFollowerIndex() { return followerIndex; } - private int maxBatchOperationCount; + public void setFollowerIndex(String followerIndex) { + this.followerIndex = followerIndex; + } + + private Integer maxBatchOperationCount; - public int getMaxBatchOperationCount() { + public Integer getMaxBatchOperationCount() { return maxBatchOperationCount; } - private int maxConcurrentReadBatches; + public void setMaxBatchOperationCount(Integer maxBatchOperationCount) { + this.maxBatchOperationCount = maxBatchOperationCount; + } - public int getMaxConcurrentReadBatches() { + private Integer maxConcurrentReadBatches; + + public Integer getMaxConcurrentReadBatches() { return maxConcurrentReadBatches; } - private long maxOperationSizeInBytes; + public void setMaxConcurrentReadBatches(Integer maxConcurrentReadBatches) { + this.maxConcurrentReadBatches = maxConcurrentReadBatches; + } + + private Long maxOperationSizeInBytes; - public long getMaxOperationSizeInBytes() { + public Long getMaxOperationSizeInBytes() { return maxOperationSizeInBytes; } - private int maxConcurrentWriteBatches; + public void setMaxOperationSizeInBytes(Long maxOperationSizeInBytes) { + this.maxOperationSizeInBytes = maxOperationSizeInBytes; + } + + private Integer maxConcurrentWriteBatches; - public int getMaxConcurrentWriteBatches() { + public Integer getMaxConcurrentWriteBatches() { return maxConcurrentWriteBatches; } - private int maxWriteBufferSize; + public void setMaxConcurrentWriteBatches(Integer maxConcurrentWriteBatches) { + this.maxConcurrentWriteBatches = maxConcurrentWriteBatches; + } + + private Integer maxWriteBufferSize; - public int getMaxWriteBufferSize() { + public Integer getMaxWriteBufferSize() { return maxWriteBufferSize; } + public void setMaxWriteBufferSize(Integer maxWriteBufferSize) { + this.maxWriteBufferSize = maxWriteBufferSize; + } + private TimeValue maxRetryDelay; + public void setMaxRetryDelay(TimeValue maxRetryDelay) { + this.maxRetryDelay = maxRetryDelay; + } + public TimeValue getMaxRetryDelay() { return maxRetryDelay; } @@ -157,88 +173,50 @@ public TimeValue getPollTimeout() { return pollTimeout; } - public Request( - final String leaderIndex, - final String followerIndex, - final Integer maxBatchOperationCount, - final Integer maxConcurrentReadBatches, - final Long maxOperationSizeInBytes, - final Integer maxConcurrentWriteBatches, - final Integer maxWriteBufferSize, - final TimeValue maxRetryDelay, - final TimeValue pollTimeout) { + public void setPollTimeout(TimeValue pollTimeout) { + this.pollTimeout = pollTimeout; + } + + public Request() { + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException e = null; if (leaderIndex == null) { - throw new IllegalArgumentException(LEADER_INDEX_FIELD.getPreferredName() + " is missing"); + e = addValidationError(LEADER_INDEX_FIELD.getPreferredName() + " is missing", e); } - if (followerIndex == null) { - throw new IllegalArgumentException(FOLLOWER_INDEX_FIELD.getPreferredName() + " is missing"); + e = addValidationError(FOLLOWER_INDEX_FIELD.getPreferredName() + " is missing", e); } - - final int actualMaxBatchOperationCount = - maxBatchOperationCount == null ? DEFAULT_MAX_BATCH_OPERATION_COUNT : maxBatchOperationCount; - if (actualMaxBatchOperationCount < 1) { - throw new IllegalArgumentException(MAX_BATCH_OPERATION_COUNT.getPreferredName() + " must be larger than 0"); + if (maxBatchOperationCount != null && maxBatchOperationCount < 1) { + e = addValidationError(MAX_BATCH_OPERATION_COUNT.getPreferredName() + " must be larger than 0", e); } - - final int actualMaxConcurrentReadBatches = - maxConcurrentReadBatches == null ? DEFAULT_MAX_CONCURRENT_READ_BATCHES : maxConcurrentReadBatches; - if (actualMaxConcurrentReadBatches < 1) { - throw new IllegalArgumentException(MAX_CONCURRENT_READ_BATCHES.getPreferredName() + " must be larger than 0"); + if (maxConcurrentReadBatches != null && maxConcurrentReadBatches < 1) { + e = addValidationError(MAX_CONCURRENT_READ_BATCHES.getPreferredName() + " must be larger than 0", e); } - - final long actualMaxOperationSizeInBytes = - maxOperationSizeInBytes == null ? DEFAULT_MAX_BATCH_SIZE_IN_BYTES : maxOperationSizeInBytes; - if (actualMaxOperationSizeInBytes <= 0) { - throw new IllegalArgumentException(MAX_BATCH_SIZE_IN_BYTES.getPreferredName() + " must be larger than 0"); + if (maxOperationSizeInBytes != null && maxOperationSizeInBytes <= 0) { + e = addValidationError(MAX_BATCH_SIZE_IN_BYTES.getPreferredName() + " must be larger than 0", e); } - - final int actualMaxConcurrentWriteBatches = - maxConcurrentWriteBatches == null ? DEFAULT_MAX_CONCURRENT_WRITE_BATCHES : maxConcurrentWriteBatches; - if (actualMaxConcurrentWriteBatches < 1) { - throw new IllegalArgumentException(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName() + " must be larger than 0"); + if (maxConcurrentWriteBatches != null && maxConcurrentWriteBatches < 1) { + e = addValidationError(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName() + " must be larger than 0", e); } - - final int actualMaxWriteBufferSize = maxWriteBufferSize == null ? DEFAULT_MAX_WRITE_BUFFER_SIZE : maxWriteBufferSize; - if (actualMaxWriteBufferSize < 1) { - throw new IllegalArgumentException(MAX_WRITE_BUFFER_SIZE.getPreferredName() + " must be larger than 0"); + if (maxWriteBufferSize != null && maxWriteBufferSize < 1) { + e = addValidationError(MAX_WRITE_BUFFER_SIZE.getPreferredName() + " must be larger than 0", e); } - - final TimeValue actualRetryTimeout = maxRetryDelay == null ? DEFAULT_MAX_RETRY_DELAY : maxRetryDelay; - final TimeValue actualPollTimeout = pollTimeout == null ? DEFAULT_POLL_TIMEOUT : pollTimeout; - - this.leaderIndex = leaderIndex; - this.followerIndex = followerIndex; - this.maxBatchOperationCount = actualMaxBatchOperationCount; - this.maxConcurrentReadBatches = actualMaxConcurrentReadBatches; - this.maxOperationSizeInBytes = actualMaxOperationSizeInBytes; - this.maxConcurrentWriteBatches = actualMaxConcurrentWriteBatches; - this.maxWriteBufferSize = actualMaxWriteBufferSize; - this.maxRetryDelay = actualRetryTimeout; - this.pollTimeout = actualPollTimeout; - } - - public Request() { - - } - - @Override - public ActionRequestValidationException validate() { - ActionRequestValidationException validationException = null; - - if (maxRetryDelay.millis() <= 0) { + if (maxRetryDelay != null && maxRetryDelay.millis() <= 0) { String message = "[" + MAX_RETRY_DELAY_FIELD.getPreferredName() + "] must be positive but was [" + maxRetryDelay.getStringRep() + "]"; - validationException = addValidationError(message, validationException); + e = addValidationError(message, e); } - if (maxRetryDelay.millis() > FollowIndexAction.MAX_RETRY_DELAY.millis()) { + if (maxRetryDelay != null && maxRetryDelay.millis() > FollowIndexAction.MAX_RETRY_DELAY.millis()) { String message = "[" + MAX_RETRY_DELAY_FIELD.getPreferredName() + "] must be less than [" + MAX_RETRY_DELAY + "] but was [" + maxRetryDelay.getStringRep() + "]"; - validationException = addValidationError(message, validationException); + e = addValidationError(message, e); } - return validationException; + return e; } @Override @@ -246,11 +224,11 @@ public void readFrom(final StreamInput in) throws IOException { super.readFrom(in); leaderIndex = in.readString(); followerIndex = in.readString(); - maxBatchOperationCount = in.readVInt(); - maxConcurrentReadBatches = in.readVInt(); - maxOperationSizeInBytes = in.readVLong(); - maxConcurrentWriteBatches = in.readVInt(); - maxWriteBufferSize = in.readVInt(); + maxBatchOperationCount = in.readOptionalVInt(); + maxConcurrentReadBatches = in.readOptionalVInt(); + maxOperationSizeInBytes = in.readOptionalLong(); + maxConcurrentWriteBatches = in.readOptionalVInt(); + maxWriteBufferSize = in.readOptionalVInt(); maxRetryDelay = in.readOptionalTimeValue(); pollTimeout = in.readOptionalTimeValue(); } @@ -260,11 +238,11 @@ public void writeTo(final StreamOutput out) throws IOException { super.writeTo(out); out.writeString(leaderIndex); out.writeString(followerIndex); - out.writeVInt(maxBatchOperationCount); - out.writeVInt(maxConcurrentReadBatches); - out.writeVLong(maxOperationSizeInBytes); - out.writeVInt(maxConcurrentWriteBatches); - out.writeVInt(maxWriteBufferSize); + out.writeOptionalVInt(maxBatchOperationCount); + out.writeOptionalVInt(maxConcurrentReadBatches); + out.writeOptionalLong(maxOperationSizeInBytes); + out.writeOptionalVInt(maxConcurrentWriteBatches); + out.writeOptionalVInt(maxWriteBufferSize); out.writeOptionalTimeValue(maxRetryDelay); out.writeOptionalTimeValue(pollTimeout); } @@ -275,13 +253,27 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa { builder.field(LEADER_INDEX_FIELD.getPreferredName(), leaderIndex); builder.field(FOLLOWER_INDEX_FIELD.getPreferredName(), followerIndex); - builder.field(MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount); - builder.field(MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxOperationSizeInBytes); - builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); - builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches); - builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); - builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), maxRetryDelay.getStringRep()); - builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep()); + if (maxBatchOperationCount != null) { + builder.field(MAX_BATCH_OPERATION_COUNT.getPreferredName(), maxBatchOperationCount); + } + if (maxOperationSizeInBytes != null) { + builder.field(MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxOperationSizeInBytes); + } + if (maxWriteBufferSize != null) { + builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize); + } + if (maxConcurrentReadBatches != null) { + builder.field(MAX_CONCURRENT_READ_BATCHES.getPreferredName(), maxConcurrentReadBatches); + } + if (maxConcurrentWriteBatches != null) { + builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches); + } + if (maxRetryDelay != null) { + builder.field(MAX_RETRY_DELAY_FIELD.getPreferredName(), maxRetryDelay.getStringRep()); + } + if (pollTimeout != null) { + builder.field(POLL_TIMEOUT.getPreferredName(), pollTimeout.getStringRep()); + } } builder.endObject(); return builder; @@ -292,11 +284,11 @@ public boolean equals(final Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Request request = (Request) o; - return maxBatchOperationCount == request.maxBatchOperationCount && - maxConcurrentReadBatches == request.maxConcurrentReadBatches && - maxOperationSizeInBytes == request.maxOperationSizeInBytes && - maxConcurrentWriteBatches == request.maxConcurrentWriteBatches && - maxWriteBufferSize == request.maxWriteBufferSize && + return Objects.equals(maxBatchOperationCount, request.maxBatchOperationCount) && + Objects.equals(maxConcurrentReadBatches, request.maxConcurrentReadBatches) && + Objects.equals(maxOperationSizeInBytes, request.maxOperationSizeInBytes) && + Objects.equals(maxConcurrentWriteBatches, request.maxConcurrentWriteBatches) && + Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) && Objects.equals(maxRetryDelay, request.maxRetryDelay) && Objects.equals(pollTimeout, request.pollTimeout) && Objects.equals(leaderIndex, request.leaderIndex) && From 5c16c1c15247f0cff8ed5620017275390caf5c54 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Tue, 18 Sep 2018 16:41:59 +0200 Subject: [PATCH 2/2] whoops --- .../org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java | 4 ++-- .../test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java | 4 ++-- .../elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java index 60b9f8f23e8b3..732f8527dc188 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java @@ -199,13 +199,13 @@ private static void refresh(String index) throws IOException { private static void followIndex(String leaderIndex, String followIndex) throws IOException { final Request request = new Request("POST", "/" + followIndex + "/_ccr/follow"); - request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}"); + request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}"); assertOK(client().performRequest(request)); } private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException { final Request request = new Request("POST", "/" + followIndex + "/_ccr/create_and_follow"); - request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}"); + request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}"); assertOK(client().performRequest(request)); } diff --git a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java index c7ecbe184de88..ed0971f18250f 100644 --- a/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java +++ b/x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java @@ -127,13 +127,13 @@ private static void refresh(String index) throws IOException { private static void followIndex(String leaderIndex, String followIndex) throws IOException { final Request request = new Request("POST", "/" + followIndex + "/_ccr/follow"); - request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}"); + request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}"); assertOK(client().performRequest(request)); } private static void createAndFollowIndex(String leaderIndex, String followIndex) throws IOException { final Request request = new Request("POST", "/" + followIndex + "/_ccr/create_and_follow"); - request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"idle_shard_retry_delay\": \"10ms\"}"); + request.setJsonEntity("{\"leader_index\": \"" + leaderIndex + "\", \"poll_timeout\": \"10ms\"}"); assertOK(client().performRequest(request)); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java index 74c189d8b0334..46679d22520c3 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java @@ -304,7 +304,7 @@ private void followLeaderIndex(String clusterAlias, Index indexToFollow, request.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches()); request.setMaxOperationSizeInBytes(pattern.getMaxOperationSizeInBytes()); request.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches()); - request.setMaxWriteBufferSize(request.getMaxWriteBufferSize()); + request.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize()); request.setMaxRetryDelay(pattern.getMaxRetryDelay()); request.setPollTimeout(pattern.getIdleShardRetryDelay());